Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Alexandru Nedelcu
@alexandru

will Monix be sticking to the global scala fork-join pool or ditch it in favor of CE3 style work stealing fixed size pool?

@rohan-sircar we haven't decided yet. Scala's fork-join is good for executing blocking tasks alongside the CPU-bounded ones. We also have an issue for improving Scheduler with a blocking operation and we'll decide from there.

Alexandru Nedelcu
@alexandru
I've created a new issue — monix/monix#1432
Oleg Pyzhcov
@oleg-py
or - hear me out - we can drop Task and use c.e.IO :-P
Alexandru Nedelcu
@alexandru
:))
Alexandru Nedelcu
@alexandru
Even if that happened, Scheduler is awesome and here to stay, and I still want the work-stealing stuff in Monix too.
Rohan Sircar
@rohan-sircar
Yep, I saw that. I think Task.blocking is much more ergonomic than carrying an io scheduler around, and that naturally leads to specializing the main thread pool for async tasks as the next step, since dealing with blocking tasks is now much more simple.
Also now that I remember, there was a project where I wrapped a better-files file watcher in Observable, and no matter how much I tried to catch the exception, it always threw a Caused by: java.nio.file.ClosedWatchServiceException on application exit. Now I know why...
Alexandru Nedelcu
@alexandru

Task.blocking is much more ergonomic than carrying an io scheduler around

Yes, but that's the building block. We start from the Scheduler and work our way up.

java.nio.file.ClosedWatchServiceException

If you're referring to NonFatal, that doesn't sound like a fatal exception. Might have been something else.

cc @rohan-sircar
Rohan Sircar
@rohan-sircar
Oh I wasn't disagreeing with scheduler#blocking ^^", just saying it might make sense to want to switch to work-stealing pool from the global scala pool

If you're referring to NonFatal, that doesn't sound like a fatal exception. Might have been something else.

Okay.. need to look into it further then

Alexandru Nedelcu
@alexandru
@rohan-sircar ah, OK, got too late last night 🙂
dinosaur
@dinosaur-lin

@Avasil @alexandru We encounter another issue today in prod. this time, we are able to capture the thread dump as suggested by @Avasil . majik-scheduler is the fork-join scheduler created.

"majik-scheduler-1245" #1245 daemon prio=5 os_prio=0 tid=0x00007f850000b800 nid=0x699c waiting on condition [0x00007f851a6e7000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000006a72e5ad0> (a monix.execution.internal.forkJoin.AdaptedForkJoinPool)
    at java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1824)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1693)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

   Locked ownable synchronizers:
    - None

Many of above

"majik-scheduler-1238" #1238 daemon prio=5 os_prio=0 tid=0x00007f850004c800 nid=0x690e waiting on condition [0x00007f8519cdd000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000006a72e5ad0> (a monix.execution.internal.forkJoin.AdaptedForkJoinPool)
    at java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1824)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1693)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

   Locked ownable synchronizers:
    - None

Occasionally this,, this seems to be monix's own thread pool

"monix-scheduler-367" #367 daemon prio=5 os_prio=0 tid=0x00007f8530020800 nid=0x2be2 waiting on condition [0x00007f85bc4c0000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000006be727628> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
    - None

No dead lock find in the thread dump, but are these thread waiting for something forever?

dinosaur
@dinosaur-lin
https://www.fatalerrors.org/a/excutors-framework-of-j.u.c-implementation-of-fork-join-framework.html, it seems awaitWork is worker thread waiting to get a task to run. so it seems there are many idle threads in the pool...
Oleg Pyzhcov
@oleg-py
yeah you might be waiting on a task that never completes for some reason
like cancelling and then joining a fiber
or hitting some bug like #1364
dinosaur
@dinosaur-lin
Thanks @oleg-py Here is some thread dump captured from prod at different times https://gist.github.com/dinosaur-lin/635d9f4cc3cee57aec6e7756fd2c660f, in case anyone is interested. It turns out there were deadlock happening in the application. It seems a bit odd that it all related to switchMap code with mulitple level of combineLatestMap4. However, looking closely at our code, it is hard to see how the deadlock happening, but it did give up some hint to add more logging at relevant locations to eventually understand how the deadlock might happen.
dinosaur
@dinosaur-lin
def testColdHotObservable1(src: Observable[Long]): Task[Unit] = {
    src.consumeWith(Consumer.foreach[Long](i => println(s"src1 $i")))
  }

 def testColdHotObservable2(src: Observable[Long]): Task[Unit] = {
    src.consumeWith(Consumer.foreach[Long](i => println(s"src2 $i"))).delayExecution(3.seconds)
  }

If running as hot observable:

val obs = Observable.interval(1.second).publish.refCount
    Task.parZip2(testColdHotObservable1(obs), testColdHotObservable2(obs)).runToFuture

it print as expected, where the src2 come up after 3 seconds on latest data

src1 0
src1 1
src1 2
src1 3
src2 3
src1 4
src2 4
src1 5
src2 5

For cold observables:

val obs = Observable.interval(1.second)
    Task.parZip2(testColdHotObservable1(obs), testColdHotObservable2(obs)).runToFuture

with output, it seems that a new observable is created for a second subscribe? Is this expected? so each new subscription for the cold observable will create a new observable...

src1 0
src1 1
src1 2
src2 0
src1 3
src2 1
src1 4
src2 2
We would like to create a hot observable with Observable.behavior.refCount, but it turns out it need the initial value. Is it possible to create one without init value?
dinosaur
@dinosaur-lin
sounds like behavior(None) might work...
dinosaur
@dinosaur-lin
Regarding cold observable vs. hot observable question... we have a lot observables sharing in the code with different multiple consumer attached into it. As our source is from external real time data source, so even for cold observable, if we attach a new consumer on it, it will subscribe to external data from that time point (... no history data) so effectively, the behaviour of cold observable and hot observable are pretty much similar... so in this case, will choose one over another make any difference? (I guess hot observable cache data in memory. that's the only difference?)
Rohan Sircar
@rohan-sircar
Observable.publish.refCount ?
Piotr Gawryś
@Avasil

with output, it seems that a new observable is created for a second subscribe? Is this expected? so each new subscription for the cold observable will create a new observable...

Yes, it's the same as Task. Cold Observable is a blueprint of some computation

sounds like behavior(None) might work...

Yeah, and then you can call collect { case Some(v) => v } to have Observable[A]

so in this case, will choose one over another make any difference

If you subscribe with N hot observables, you will have 1 connection to the data source, if you subscribe with N cold observables, you will have N separate connections.
The other difference could be back-pressure. Hot Observable will back-pressure on all subscribers. Cold Observables will be independent

I guess hot observable cache data in memory

Not necessarily, it depends on underlying Subject. ReplaySubject caches data, PublishSubject does not.

dinosaur
@dinosaur-lin
hi @rohan-sircar Obersable.publish.refCount will miss the last pricing tick. so it doesn't work for us...
Rohan Sircar
@rohan-sircar
huh, I thought publish and behavior are the same thing with the latter taking an initial value?
anyway, iiuc if you want the last event replayed, maybe a replay observable with a replay value of 1 ought to work?
dinosaur
@dinosaur-lin
@rohan-sircar thanks, that should work for us without changing all the type signature like bahavior,thank you
Rohan Sircar
@rohan-sircar
Cool. Let us know if it works out (or not :p)
Trần Tiến Đức
@trantienduchn
hello :) I'm finding a handy way to do retry-backoff with monix. is their any thing I can use? I'm imagining something like this:
Retry.with(Observable.from(...))
   .maxRetries(5)
  .initialBackoff(1.second)
  ....
Jasper Moeys
@Jasper-M
1 reply
lmeyer
@LeonardMeyer
Hi. We're considering Monix for some project. We're primarily interested at executing jobs when data is pushed through HTTP and pulled from a FS. We were wondering if Monix is suited for this and about its scheduling capabilities. Does the scheduler suffer from clock drifiting like most other implementation ?
Didac
@umbreak

Hi. Is monix going to be compatible with cats-effect 3? I’ve tried bumping fs2 and cats-retry to 3.0.x and then I got some dependency error:

 * org.typelevel:cats-effect_2.13:3.1.0 (early-semver) is selected over 2.5.1
     +- com.github.cb372:cats-retry_2.13:3.0.0             (depends on 3.1.0)
     +- io.monix:monix-catnap_2.13:3.4.0                   (depends on 2.5.1)

After which I realized that monix is using cats-effect 2.5.1. In the github repo I don’t see anything related to cats-effect 33 either

Piotr Gawryś
@Avasil

@LeonardMeyer Scheduler implementation uses ScheduledExecutorService so it probably has some clock drifting but I'm not an expert here. Scheduler is rather low-level though, it's like an ExecutionContext. As a user, you'd be probably more interested in Observable. If you need to schedule something to run in specific time intervals, there are methods like Observable.intervalAtFixedRate and Observable.intervalWithFixedDelay that use a monotonic clock. If you need to run jobs during a very specific time (cron-like), then we don't have anything built-in at the moment. There was a monix/monix#885 but it wasn't finished

BTW we're experimenting with discord (invite link: https://discord.com/invite/wsVZSEx4Nw) which is a bit more active, it might be a good idea to repeat some questions there

@umbreak It is not yet compatible and it's going to take some time. We are working on it on series/4.x branch
Didac
@umbreak
thanks for the feedback @Avasil
Kamil Kloch
@kamilkloch
hello all, is there a way to lock a Task into a specific scheduler? (akin to ZIO.lock)
Kamil Kloch
@kamilkloch

hello all, is there a way to lock a Task into a specific scheduler? (akin to ZIO.lock)

Task().executeOn?

Piotr Gawryś
@Avasil
yes
Kamil Kloch
@kamilkloch

yes

thanks! :)

dinosaur
@dinosaur-lin
@rohan-sircar I think it worked out perfectly. I don't see any issue. @Avasil @oleg-py @alexandru about the switchMap deadlock, no idea why it is causing that, mostly likely failed to cancel previous child observable (the child observable used to be cold, the code to unscribe from external source may not be done cleanly), but we are able to rewrite the code without switchMap . the app has been running in prod for a week. no more application hang, so it seems the issue is solved. BTW, I dig a bit into monix code, now I understand, uncaught exception report forNonFatal exceptions is logged everywhere, but it shouldn't kill the threads. Our application exceptions are definitely NonFatal...
Trần Tiến Đức
@trantienduchn
Hi, is there a feature that allow an Observable can retryIf/retryWhen + maxRetries + exponentialBackOff ?
Jasper Moeys
@Jasper-M
Subject is defined with separate in I and out O type parameters. Is there a way to use this? E.g. a way to create a Subject[Int, String]?
Piotr Gawryś
@Avasil
@Jasper-M Yes, but it requires cats syntax ( import cats.syntax.profunctor._ ). Check Profunctor methods like dimap, lmap, rmap. We should add something on Subject directly
Jasper Moeys
@Jasper-M
Ah thanks. I didn't think to look in there
Frédéric Cabestre
@fcabestre

Hello everybody,

I'm trying to port an MQTT client library based on fs2 to cats effect 3. I'm just at the beginning on the journey, carefully following the cats effect migration guide. One of the first steps is to update dependencies and check for eviction errors. I use monix in an example to show how to write a simple client with this library yet demonstrating compatibility with monix and zio.

And here is my issue. I get the following eviction check error while I though (but I may be wrong) that monix 3.4.0 compatible with cats effect 3.x.

[error] (examples / evicted) found version conflict(s) in library dependencies; some are suspected to be binary incompatible:
[error] 
[error]     * org.typelevel:cats-effect_2.12:3.1.1 (early-semver) is selected over {3.1.0, 2.5.1}
[error]         +- com.comcast:ip4s-core_2.12:3.0.3                   (depends on 3.1.1)
[error]         +- net.sigusr:fs2-mqtt_2.12:0.5.1+2-eef90374+20210624-1446-SNAPSHOT (depends on 3.1.1)
[error]         +- co.fs2:fs2-core_2.12:3.0.4                         (depends on 3.1.1)
[error]         +- io.monix:monix-catnap_2.12:3.4.0                   (depends on 2.5.1)
[error]         +- com.github.cb372:cats-retry_2.12:3.0.0             (depends on 3.1.0)

Could someone tell me where I'm wrong please.

Oleg Pyzhcov
@oleg-py
@fcabestre no modules of monix are yet available for cats-effect-3. I did some porting for catnap though, you might be able to "borrow" what you need from that PR if you're in a rush: monix/monix#1390
sadly cats-effect is quite pervasive in monix modules that are not monix-execution, and the API changes are very breaking
Frédéric Cabestre
@fcabestre
Nice, thank you for your help @oleg-py . So I had completely misunderstood availability of monix for cats-effect 3, my bad.
dinosaur
@dinosaur-lin
Hi guys, I modified monix's example code for Observable.create. Something puzzled me:
```scala