Caused by: java.nio.file.ClosedWatchServiceException
on application exit. Now I know why...
Task.blocking is much more ergonomic than carrying an io scheduler around
Yes, but that's the building block. We start from the Scheduler and work our way up.
java.nio.file.ClosedWatchServiceException
If you're referring to NonFatal
, that doesn't sound like a fatal exception. Might have been something else.
If you're referring to NonFatal, that doesn't sound like a fatal exception. Might have been something else.
Okay.. need to look into it further then
@Avasil @alexandru We encounter another issue today in prod. this time, we are able to capture the thread dump as suggested by @Avasil . majik-scheduler
is the fork-join scheduler created.
"majik-scheduler-1245" #1245 daemon prio=5 os_prio=0 tid=0x00007f850000b800 nid=0x699c waiting on condition [0x00007f851a6e7000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006a72e5ad0> (a monix.execution.internal.forkJoin.AdaptedForkJoinPool)
at java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1824)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1693)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Locked ownable synchronizers:
- None
Many of above
"majik-scheduler-1238" #1238 daemon prio=5 os_prio=0 tid=0x00007f850004c800 nid=0x690e waiting on condition [0x00007f8519cdd000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006a72e5ad0> (a monix.execution.internal.forkJoin.AdaptedForkJoinPool)
at java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1824)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1693)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Locked ownable synchronizers:
- None
Occasionally this,, this seems to be monix's own thread pool
"monix-scheduler-367" #367 daemon prio=5 os_prio=0 tid=0x00007f8530020800 nid=0x2be2 waiting on condition [0x00007f85bc4c0000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006be727628> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- None
No dead lock find in the thread dump, but are these thread waiting for something forever?
awaitWork
is worker thread waiting to get a task to run. so it seems there are many idle threads in the pool...
switchMap
code with mulitple level of combineLatestMap4
. However, looking closely at our code, it is hard to see how the deadlock happening, but it did give up some hint to add more logging at relevant locations to eventually understand how the deadlock might happen.
def testColdHotObservable1(src: Observable[Long]): Task[Unit] = {
src.consumeWith(Consumer.foreach[Long](i => println(s"src1 $i")))
}
def testColdHotObservable2(src: Observable[Long]): Task[Unit] = {
src.consumeWith(Consumer.foreach[Long](i => println(s"src2 $i"))).delayExecution(3.seconds)
}
If running as hot observable:
val obs = Observable.interval(1.second).publish.refCount
Task.parZip2(testColdHotObservable1(obs), testColdHotObservable2(obs)).runToFuture
it print as expected, where the src2 come up after 3 seconds on latest data
src1 0
src1 1
src1 2
src1 3
src2 3
src1 4
src2 4
src1 5
src2 5
For cold observables:
val obs = Observable.interval(1.second)
Task.parZip2(testColdHotObservable1(obs), testColdHotObservable2(obs)).runToFuture
with output, it seems that a new observable is created for a second subscribe? Is this expected? so each new subscription for the cold observable will create a new observable...
src1 0
src1 1
src1 2
src2 0
src1 3
src2 1
src1 4
src2 2
Observable.behavior.refCount
, but it turns out it need the initial value. Is it possible to create one without init value?
with output, it seems that a new observable is created for a second subscribe? Is this expected? so each new subscription for the cold observable will create a new observable...
Yes, it's the same as Task
. Cold Observable
is a blueprint of some computation
sounds like
behavior(None)
might work...
Yeah, and then you can call collect { case Some(v) => v }
to have Observable[A]
so in this case, will choose one over another make any difference
If you subscribe with N hot observables, you will have 1 connection to the data source, if you subscribe with N cold observables, you will have N separate connections.
The other difference could be back-pressure. Hot Observable will back-pressure on all subscribers. Cold Observables will be independent
I guess hot observable cache data in memory
Not necessarily, it depends on underlying Subject
. ReplaySubject
caches data, PublishSubject
does not.
replay
observable with a replay value of 1 ought to work?
Hi. Is monix going to be compatible with cats-effect 3? I’ve tried bumping fs2 and cats-retry to 3.0.x and then I got some dependency error:
* org.typelevel:cats-effect_2.13:3.1.0 (early-semver) is selected over 2.5.1
+- com.github.cb372:cats-retry_2.13:3.0.0 (depends on 3.1.0)
+- io.monix:monix-catnap_2.13:3.4.0 (depends on 2.5.1)
After which I realized that monix is using cats-effect 2.5.1. In the github repo I don’t see anything related to cats-effect 33 either
@LeonardMeyer Scheduler
implementation uses ScheduledExecutorService
so it probably has some clock drifting but I'm not an expert here. Scheduler
is rather low-level though, it's like an ExecutionContext
. As a user, you'd be probably more interested in Observable
. If you need to schedule something to run in specific time intervals, there are methods like Observable.intervalAtFixedRate
and Observable.intervalWithFixedDelay
that use a monotonic clock. If you need to run jobs during a very specific time (cron-like), then we don't have anything built-in at the moment. There was a monix/monix#885 but it wasn't finished
BTW we're experimenting with discord (invite link: https://discord.com/invite/wsVZSEx4Nw) which is a bit more active, it might be a good idea to repeat some questions there
series/4.x
branch
switchMap
deadlock, no idea why it is causing that, mostly likely failed to cancel previous child observable (the child observable used to be cold, the code to unscribe from external source may not be done cleanly), but we are able to rewrite the code without switchMap
. the app has been running in prod for a week. no more application hang, so it seems the issue is solved. BTW, I dig a bit into monix code, now I understand, uncaught exception report forNonFatal
exceptions is logged everywhere, but it shouldn't kill the threads. Our application exceptions are definitely NonFatal
...
Hello everybody,
I'm trying to port an MQTT client library based on fs2
to cats effect 3
. I'm just at the beginning on the journey, carefully following the cats effect
migration guide. One of the first steps is to update dependencies and check for eviction errors. I use monix
in an example to show how to write a simple client with this library yet demonstrating compatibility with monix
and zio
.
And here is my issue. I get the following eviction check error while I though (but I may be wrong) that monix 3.4.0
compatible with cats effect 3.x
.
[error] (examples / evicted) found version conflict(s) in library dependencies; some are suspected to be binary incompatible:
[error]
[error] * org.typelevel:cats-effect_2.12:3.1.1 (early-semver) is selected over {3.1.0, 2.5.1}
[error] +- com.comcast:ip4s-core_2.12:3.0.3 (depends on 3.1.1)
[error] +- net.sigusr:fs2-mqtt_2.12:0.5.1+2-eef90374+20210624-1446-SNAPSHOT (depends on 3.1.1)
[error] +- co.fs2:fs2-core_2.12:3.0.4 (depends on 3.1.1)
[error] +- io.monix:monix-catnap_2.12:3.4.0 (depends on 2.5.1)
[error] +- com.github.cb372:cats-retry_2.12:3.0.0 (depends on 3.1.0)
Could someone tell me where I'm wrong please.
package test
import cats.effect.ExitCase
import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.OverflowStrategy.Unbounded
import monix.reactive.{Consumer, Observable}
import scala.concurrent.duration._
import monix.eval.Task
import monix.execution.Ack
import monix.reactive.Observable
import monix.reactive.OverflowStrategy
import monix.reactive.observers.Subscriber
import scala.concurrent.duration._
object TestApp1 {
def main(args: Array[String]): Unit = {
implicit val s = Scheduler.forkJoin(2, 5)
def producerLoop(sub: Subscriber[Int], n: Int = 0): Task[Unit] = {
Task.deferFuture(sub.onNext(n))
.delayExecution(100.millis)
.flatMap {
case Ack.Continue =>
println(s"producer to produce ${n + 1}")
producerLoop(sub, n + 1)
case Ack.Stop =>
println(s"producer is stopped at $n")
Task { println(s"producer is stopped at $n ")}
}
}
val source: Observable[Int] =
Observable.create(OverflowStrategy.Unbounded) { sub =>
producerLoop(sub)
.guarantee(Task(println("Producer has been completed")))
.runToFuture(sub.scheduler)
}
val cancel = source.dump("O").subscribe()
Task.eval( cancel.cancel() ).delayExecution(1000.millis).runToFuture
s.awaitTermination(1000.minutes)
}
}