Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Oleg Pyzhcov
@oleg-py
but I'm down to have them yeeted
Alexandru Nedelcu
@alexandru

@Avasil I don't understand what's going on here:

[error] monix-eval: Failed binary compatibility check against io.monix:monix-eval_2.13:3.3.0! Found 6 potential problems
[error]  * static method apply(Boolean,Int,Int)monix.eval.tracing.PrintingOptions in class monix.eval.tracing.PrintingOptions does not have a correspondent in current version
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("monix.eval.tracing.PrintingOptions.apply")
[error]  * method copy(Boolean,Int,Int)monix.eval.tracing.PrintingOptions in class monix.eval.tracing.PrintingOptions does not have a correspondent in current version
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("monix.eval.tracing.PrintingOptions.copy")
[error]  * synthetic method copy$default$1()Boolean in class monix.eval.tracing.PrintingOptions does not have a correspondent in current version
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("monix.eval.tracing.PrintingOptions.copy$default$1")
[error]  * synthetic method copy$default$2()Int in class monix.eval.tracing.PrintingOptions does not have a correspondent in current version
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("monix.eval.tracing.PrintingOptions.copy$default$2")
[error]  * synthetic method copy$default$3()Int in class monix.eval.tracing.PrintingOptions does not have a correspondent in current version
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("monix.eval.tracing.PrintingOptions.copy$default$3")
[error]  * method apply(Boolean,Int,Int)monix.eval.tracing.PrintingOptions in object monix.eval.tracing.PrintingOptions does not have a correspondent in current version
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("monix.eval.tracing.PrintingOptions.apply")
[error] stack trace is suppressed; run last evalJVM / mimaReportBinaryIssues for the full output

In the Dotty PR. The PrintingOptions definition hasn't changed. The Scala version hasn't changed either.

Ah shit, it might be the -Xsource:3 I added.
Jasper Moeys
@Jasper-M
yes probably related to private case class contructors
Alec Zorab
@AlecZorab
is there any prior art on something that looks like ReplaySubject but it caches to disk? I can see that I can probably just mangle ReplaySubject, but I don't feel like I'm going to be the first person who's wanted to do this
Alexandru Nedelcu
@alexandru
We don't have any disk caching, but would be cool to know what you're thinking of, or the implementation you come up with.
Alexandru Nedelcu
@alexandru

The Scala 3 (Dotty) PR is ready for review:

monix/monix#1323

dinosaur
@dinosaur-lin

@Avasil as discussed parMap2 or parZip2 are leaking exceptions, which throw uncaught exceptions, when there mulitiple errors happend. I understand, we can try to redeem on each errors, as you suggested, but that is not exactly I want. Since most time, we don't care about each individual errors. It turns out this may be exactly I want:

def safeParZip2(task1: Task[Unit], task2: Task[Unit]): Task[Unit] = {
    for {
      f1 <- task1.start
      f2 <- task2.start
      j1 <- f1.join
      j2 <- f2.join
    } yield (j1, j2)
  }

It doesn't throw uncaught exceptions, and the api will be exactly like parZip2 or parMap2, and the tasks are running in parallel, does it make sense? Is it already available in monix, or does it consider a potential addition to Task API?

Trần Tiến Đức
@trantienduchn

Can someone explain to me why this happens? I was expecting OP3 is done under io1 and if there's a documentation somewhere is even better. My understanding: subscribeOn(A) implies upper streams to use scheduler A, observeOn(A) implies down streams to use scheduler A. But this case it happens with flatMap, then it comes confusing to me.

      Observable(1)
        .map(i => {println(Thread.currentThread().getName + ": 1 : " + i); i + 1}) // (OP1)
        .flatMap(i => Observable.eval{println(Thread.currentThread().getName + ": 2 : " + i); i + 1}.subscribeOn(io2)) // (OP2)
        .subscribeOn(io1)
        .map(i => {println(Thread.currentThread().getName + ": 3 : " + i); i + 1}) // (OP3)
        .subscribeOn(io1)
        .toListL
        .runSyncUnsafe()

This is the result:

io-1-19: 1 : 1
io-2-20: 2 : 2
io-2-20: 3 : 3

I was expecting io-1-19: 3 : 3 instead of io-2-20: 3 : 3 (the third line)

Piotr Gawryś
@Avasil

@dinosaur-lin
In your implementation, if task1 and task2 both fail, task2 failure will silently disappear into thin air. In the case of parZip2, the other error will be at least reported to the scheduler (printed by default) so we are always notified about the error in some way. Compare the following code with safeParZip2 and Task.parZip2 (Thread.sleep to emulate synchronous operation that ends with an error):

implicit val s = Scheduler.global

  Task.parZip2(
    Task.suspend { Thread.sleep(100); Task.raiseError(new Exception("task 1"))},
    Task.suspend { Thread.sleep(100); Task.raiseError(new Exception("task 2"))}
  ).attempt.runSyncUnsafe()

It doesn't throw uncaught exceptions

Task.parZip2 doesn't rethrow uncaught exceptions either, It only reports it to the Scheduler. I have no clue why would it kill your threads. Do you use custom UncaughtExceptionReporter that is doing something shady?

Piotr Gawryś
@Avasil

@trantienduchn
I think the confusion lies with how flatMap works on streams. It could use more documentation.

You subscribe to Observable(1) and do the first map on io1 due to subscribeOn(io1) as you expect.
Then, for each emitted element, you generate a new Observable that is subscribed on io2.
All downstream operations receive elements from the new stream (the result of flatMap) that started with io2.

Let me know if it's still confusing

dinosaur
@dinosaur-lin

@Avasil In our code, the custom exception reporter doesn't do anything except reporting:

val reporter = UncaughtExceptionReporter{t => logger.error(s"Uncaught exception: ${t.getMessage}\n\t${t.getStackTrace.mkString("\n\t")}") }

It looks like this. So in your mind, the the exception just reporting to the Scheduler, it doesn't kill threads? That's good news to me. But I do see in some cases that thread number are decreases from JConsole. In fact, a lot of java documentation said uncaught exception will kill thread. For example:

https://www.javamex.com/tutorials/exceptions/exceptions_uncaught_handler.shtml

 When an uncaught exception occurs, the JVM does the following:

    it calls a special private method, dispatchUncaughtException(), on the Thread class in which the exception occurs;
    it then terminates the thread in which the exception occurred1.

but usually the thread pool will start new threads when threads are killed, if execute is used to run task on thread pool, which is in the case of monix fork-join thread pool. So usually it shouldn't matter too much, unless too many threads are killed at the same time, which might cause the hang. We actually increased the max thread number to 1024 and it survive the high volume on last Friday.

Obviously, this might just be my guess, and uncaught exception in usual scenario doesn't kill threads in monix as you suggested. But do you know in what case that Monix might cause uncaught exception in the underlying code that may kill threads? so we will try to avoid as much as possible...

Rohan Sircar
@rohan-sircar
wouldn't a top level exception handler solve this?
Like in your main
for {
..
_ <- MyApp(..).run.onErrorHandleWith(...)
} yield ()
Alexandru Nedelcu
@alexandru
@dinosaur-lin what kind of Scheduler are you using? A fork-join?
Alexandru Nedelcu
@alexandru

@dinosaur-lin I don't remember exactly, but there may be cases in which we are unable to catch exceptions. We try catching them in execute, but even there we are using NonFatal filtering. And we also try installing that reporter as the thread pool's exception handler, but that's only when we can, so it depends on the implementation (e.g. when you wrap another ExecutionContext, you don't get access to the underlying thread-pool's configuration).

On your task parallelism example making use of parMap2, note that it's not the Scheduler that's catching those errors, the Scheduler is only in charge of logging. It's the parMap2 implementation that catches the errors and logs them. So it wouldn't make sense for threads to get killed in that instance.

Note that the safeParZip2 implementation you pasted is probably unsafe. Fibers are in general problematic, as they can be fire and forget. Canceling the resulting Task can create a leak, and to use safely that .start should always be in the context of a bracket.

Oleg Pyzhcov
@oleg-py
Why for the love of sanity do you even have 1024 threads?
@dinosaur-lin
If you don't have 1024 cores you probably should be using two pools, for computation and blocking
Rohan Sircar
@rohan-sircar
:D
his problem is that his threads keep getting killed as he says
Alexandru Nedelcu
@alexandru
It's the max-threads count, not the active count. Fork-join can increase the active count, up until a max, when blocking operations happen. Maybe the threads keep getting killed when unused.
Oleg Pyzhcov
@oleg-py
^ this
or they can be killed because somebody throws an exception that NonFatal doesn't catch
like sttp that just yeets InterruptedException on cancellation :D
Alexandru Nedelcu
@alexandru
I think we have to fix that part. NonFatal is maybe not a good idea after all. Outside of OOMs, I don't see a problem with catching other Throwables. And InterruptedException should be dealt with.
Oleg Pyzhcov
@oleg-py
I agree. I'd even be down to have cats work like that overall. The issue is that it won't be consistent with what stdlib Future does.
Alexandru Nedelcu
@alexandru

@oleg-py you opened an issue some time ago, which we ignored :)
monix/monix#924

And this interesting discussion on InterruptedException was linked:
scala/bug#8938

For one, seems like InterruptedException is like a breach of protocol, as it's not supposed to be thrown async. But mistakes and protocol violations happen 🤷‍♂️
Oleg Pyzhcov
@oleg-py
Yeah and they are now wrapping it and Errors in ExecutionException
Trần Tiến Đức
@trantienduchn

@trantienduchn
I think the confusion lies with how flatMap works on streams. It could use more documentation.

You subscribe to Observable(1) and do the first map on io1 due to subscribeOn(io1) as you expect.
Then, for each emitted element, you generate a new Observable that is subscribed on io2.
All downstream operations receive elements from the new stream (the result of flatMap) that started with io2.

Let me know if it's still confusing

@Avasil Hi thank you so much for the clear answer! Now I know better about flatMap. So if I want the flatMap run on io2 only and keep io1 on the other map. What should I do? The real world scenario here is that I want other map run on CPU bounded scheduler and flatMap run on an IO scheduler.

Rohan Sircar
@rohan-sircar
will Monix be sticking to the global scala fork-join pool or ditch it in favor of CE3 style work stealing fixed size pool?
Piotr Gawryś
@Avasil
@trantienduchn I think flatMap(...).observeOn(io1).map(...) should work
Trần Tiến Đức
@trantienduchn

@trantienduchn I think flatMap(...).observeOn(io1).map(...) should work

make sense! Thanks

Alexandru Nedelcu
@alexandru

will Monix be sticking to the global scala fork-join pool or ditch it in favor of CE3 style work stealing fixed size pool?

@rohan-sircar we haven't decided yet. Scala's fork-join is good for executing blocking tasks alongside the CPU-bounded ones. We also have an issue for improving Scheduler with a blocking operation and we'll decide from there.

Alexandru Nedelcu
@alexandru
I've created a new issue — monix/monix#1432
Oleg Pyzhcov
@oleg-py
or - hear me out - we can drop Task and use c.e.IO :-P
Alexandru Nedelcu
@alexandru
:))
Alexandru Nedelcu
@alexandru
Even if that happened, Scheduler is awesome and here to stay, and I still want the work-stealing stuff in Monix too.
Rohan Sircar
@rohan-sircar
Yep, I saw that. I think Task.blocking is much more ergonomic than carrying an io scheduler around, and that naturally leads to specializing the main thread pool for async tasks as the next step, since dealing with blocking tasks is now much more simple.
Also now that I remember, there was a project where I wrapped a better-files file watcher in Observable, and no matter how much I tried to catch the exception, it always threw a Caused by: java.nio.file.ClosedWatchServiceException on application exit. Now I know why...
Alexandru Nedelcu
@alexandru

Task.blocking is much more ergonomic than carrying an io scheduler around

Yes, but that's the building block. We start from the Scheduler and work our way up.

java.nio.file.ClosedWatchServiceException

If you're referring to NonFatal, that doesn't sound like a fatal exception. Might have been something else.

cc @rohan-sircar
Rohan Sircar
@rohan-sircar
Oh I wasn't disagreeing with scheduler#blocking ^^", just saying it might make sense to want to switch to work-stealing pool from the global scala pool

If you're referring to NonFatal, that doesn't sound like a fatal exception. Might have been something else.

Okay.. need to look into it further then

Alexandru Nedelcu
@alexandru
@rohan-sircar ah, OK, got too late last night 🙂
dinosaur
@dinosaur-lin

@Avasil @alexandru We encounter another issue today in prod. this time, we are able to capture the thread dump as suggested by @Avasil . majik-scheduler is the fork-join scheduler created.

"majik-scheduler-1245" #1245 daemon prio=5 os_prio=0 tid=0x00007f850000b800 nid=0x699c waiting on condition [0x00007f851a6e7000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000006a72e5ad0> (a monix.execution.internal.forkJoin.AdaptedForkJoinPool)
    at java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1824)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1693)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

   Locked ownable synchronizers:
    - None

Many of above

"majik-scheduler-1238" #1238 daemon prio=5 os_prio=0 tid=0x00007f850004c800 nid=0x690e waiting on condition [0x00007f8519cdd000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000006a72e5ad0> (a monix.execution.internal.forkJoin.AdaptedForkJoinPool)
    at java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1824)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1693)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

   Locked ownable synchronizers:
    - None

Occasionally this,, this seems to be monix's own thread pool

"monix-scheduler-367" #367 daemon prio=5 os_prio=0 tid=0x00007f8530020800 nid=0x2be2 waiting on condition [0x00007f85bc4c0000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000006be727628> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
    - None

No dead lock find in the thread dump, but are these thread waiting for something forever?

dinosaur
@dinosaur-lin
https://www.fatalerrors.org/a/excutors-framework-of-j.u.c-implementation-of-fork-join-framework.html, it seems awaitWork is worker thread waiting to get a task to run. so it seems there are many idle threads in the pool...
Oleg Pyzhcov
@oleg-py
yeah you might be waiting on a task that never completes for some reason
like cancelling and then joining a fiber
or hitting some bug like #1364