@oleg-py you opened an issue some time ago, which we ignored :)
monix/monix#924
And this interesting discussion on InterruptedException
was linked:
scala/bug#8938
InterruptedException
is like a breach of protocol, as it's not supposed to be thrown async. But mistakes and protocol violations happen 🤷♂️
@trantienduchn
I think the confusion lies with howflatMap
works on streams. It could use more documentation.You subscribe to
Observable(1)
and do the firstmap
onio1
due tosubscribeOn(io1)
as you expect.
Then, for each emitted element, you generate a newObservable
that is subscribed onio2
.
All downstream operations receive elements from the new stream (the result offlatMap
) that started withio2
.Let me know if it's still confusing
@Avasil Hi thank you so much for the clear answer! Now I know better about flatMap. So if I want the flatMap run on io2
only and keep io1
on the other map
. What should I do? The real world scenario here is that I want other map
run on CPU bounded scheduler and flatMap
run on an IO scheduler.
will Monix be sticking to the global scala fork-join pool or ditch it in favor of CE3 style work stealing fixed size pool?
@rohan-sircar we haven't decided yet. Scala's fork-join is good for executing blocking tasks alongside the CPU-bounded ones. We also have an issue for improving Scheduler
with a blocking
operation and we'll decide from there.
Caused by: java.nio.file.ClosedWatchServiceException
on application exit. Now I know why...
Task.blocking is much more ergonomic than carrying an io scheduler around
Yes, but that's the building block. We start from the Scheduler and work our way up.
java.nio.file.ClosedWatchServiceException
If you're referring to NonFatal
, that doesn't sound like a fatal exception. Might have been something else.
If you're referring to NonFatal, that doesn't sound like a fatal exception. Might have been something else.
Okay.. need to look into it further then
@Avasil @alexandru We encounter another issue today in prod. this time, we are able to capture the thread dump as suggested by @Avasil . majik-scheduler
is the fork-join scheduler created.
"majik-scheduler-1245" #1245 daemon prio=5 os_prio=0 tid=0x00007f850000b800 nid=0x699c waiting on condition [0x00007f851a6e7000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006a72e5ad0> (a monix.execution.internal.forkJoin.AdaptedForkJoinPool)
at java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1824)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1693)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Locked ownable synchronizers:
- None
Many of above
"majik-scheduler-1238" #1238 daemon prio=5 os_prio=0 tid=0x00007f850004c800 nid=0x690e waiting on condition [0x00007f8519cdd000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006a72e5ad0> (a monix.execution.internal.forkJoin.AdaptedForkJoinPool)
at java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1824)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1693)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Locked ownable synchronizers:
- None
Occasionally this,, this seems to be monix's own thread pool
"monix-scheduler-367" #367 daemon prio=5 os_prio=0 tid=0x00007f8530020800 nid=0x2be2 waiting on condition [0x00007f85bc4c0000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006be727628> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- None
No dead lock find in the thread dump, but are these thread waiting for something forever?
awaitWork
is worker thread waiting to get a task to run. so it seems there are many idle threads in the pool...
switchMap
code with mulitple level of combineLatestMap4
. However, looking closely at our code, it is hard to see how the deadlock happening, but it did give up some hint to add more logging at relevant locations to eventually understand how the deadlock might happen.
def testColdHotObservable1(src: Observable[Long]): Task[Unit] = {
src.consumeWith(Consumer.foreach[Long](i => println(s"src1 $i")))
}
def testColdHotObservable2(src: Observable[Long]): Task[Unit] = {
src.consumeWith(Consumer.foreach[Long](i => println(s"src2 $i"))).delayExecution(3.seconds)
}
If running as hot observable:
val obs = Observable.interval(1.second).publish.refCount
Task.parZip2(testColdHotObservable1(obs), testColdHotObservable2(obs)).runToFuture
it print as expected, where the src2 come up after 3 seconds on latest data
src1 0
src1 1
src1 2
src1 3
src2 3
src1 4
src2 4
src1 5
src2 5
For cold observables:
val obs = Observable.interval(1.second)
Task.parZip2(testColdHotObservable1(obs), testColdHotObservable2(obs)).runToFuture
with output, it seems that a new observable is created for a second subscribe? Is this expected? so each new subscription for the cold observable will create a new observable...
src1 0
src1 1
src1 2
src2 0
src1 3
src2 1
src1 4
src2 2
Observable.behavior.refCount
, but it turns out it need the initial value. Is it possible to create one without init value?
with output, it seems that a new observable is created for a second subscribe? Is this expected? so each new subscription for the cold observable will create a new observable...
Yes, it's the same as Task
. Cold Observable
is a blueprint of some computation
sounds like
behavior(None)
might work...
Yeah, and then you can call collect { case Some(v) => v }
to have Observable[A]
so in this case, will choose one over another make any difference
If you subscribe with N hot observables, you will have 1 connection to the data source, if you subscribe with N cold observables, you will have N separate connections.
The other difference could be back-pressure. Hot Observable will back-pressure on all subscribers. Cold Observables will be independent
I guess hot observable cache data in memory
Not necessarily, it depends on underlying Subject
. ReplaySubject
caches data, PublishSubject
does not.
replay
observable with a replay value of 1 ought to work?
Hi. Is monix going to be compatible with cats-effect 3? I’ve tried bumping fs2 and cats-retry to 3.0.x and then I got some dependency error:
* org.typelevel:cats-effect_2.13:3.1.0 (early-semver) is selected over 2.5.1
+- com.github.cb372:cats-retry_2.13:3.0.0 (depends on 3.1.0)
+- io.monix:monix-catnap_2.13:3.4.0 (depends on 2.5.1)
After which I realized that monix is using cats-effect 2.5.1. In the github repo I don’t see anything related to cats-effect 33 either
@LeonardMeyer Scheduler
implementation uses ScheduledExecutorService
so it probably has some clock drifting but I'm not an expert here. Scheduler
is rather low-level though, it's like an ExecutionContext
. As a user, you'd be probably more interested in Observable
. If you need to schedule something to run in specific time intervals, there are methods like Observable.intervalAtFixedRate
and Observable.intervalWithFixedDelay
that use a monotonic clock. If you need to run jobs during a very specific time (cron-like), then we don't have anything built-in at the moment. There was a monix/monix#885 but it wasn't finished
BTW we're experimenting with discord (invite link: https://discord.com/invite/wsVZSEx4Nw) which is a bit more active, it might be a good idea to repeat some questions there
series/4.x
branch
switchMap
deadlock, no idea why it is causing that, mostly likely failed to cancel previous child observable (the child observable used to be cold, the code to unscribe from external source may not be done cleanly), but we are able to rewrite the code without switchMap
. the app has been running in prod for a week. no more application hang, so it seems the issue is solved. BTW, I dig a bit into monix code, now I understand, uncaught exception report forNonFatal
exceptions is logged everywhere, but it shouldn't kill the threads. Our application exceptions are definitely NonFatal
...