Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
dinosaur
@dinosaur-lin
hi @rohan-sircar Obersable.publish.refCount will miss the last pricing tick. so it doesn't work for us...
Rohan Sircar
@rohan-sircar
huh, I thought publish and behavior are the same thing with the latter taking an initial value?
anyway, iiuc if you want the last event replayed, maybe a replay observable with a replay value of 1 ought to work?
dinosaur
@dinosaur-lin
@rohan-sircar thanks, that should work for us without changing all the type signature like bahavior,thank you
Rohan Sircar
@rohan-sircar
Cool. Let us know if it works out (or not :p)
Trần Tiến Đức
@trantienduchn
hello :) I'm finding a handy way to do retry-backoff with monix. is their any thing I can use? I'm imagining something like this:
Retry.with(Observable.from(...))
   .maxRetries(5)
  .initialBackoff(1.second)
  ....
Jasper Moeys
@Jasper-M
1 reply
lmeyer
@LeonardMeyer
Hi. We're considering Monix for some project. We're primarily interested at executing jobs when data is pushed through HTTP and pulled from a FS. We were wondering if Monix is suited for this and about its scheduling capabilities. Does the scheduler suffer from clock drifiting like most other implementation ?
Didac
@umbreak

Hi. Is monix going to be compatible with cats-effect 3? I’ve tried bumping fs2 and cats-retry to 3.0.x and then I got some dependency error:

 * org.typelevel:cats-effect_2.13:3.1.0 (early-semver) is selected over 2.5.1
     +- com.github.cb372:cats-retry_2.13:3.0.0             (depends on 3.1.0)
     +- io.monix:monix-catnap_2.13:3.4.0                   (depends on 2.5.1)

After which I realized that monix is using cats-effect 2.5.1. In the github repo I don’t see anything related to cats-effect 33 either

Piotr Gawryś
@Avasil

@LeonardMeyer Scheduler implementation uses ScheduledExecutorService so it probably has some clock drifting but I'm not an expert here. Scheduler is rather low-level though, it's like an ExecutionContext. As a user, you'd be probably more interested in Observable. If you need to schedule something to run in specific time intervals, there are methods like Observable.intervalAtFixedRate and Observable.intervalWithFixedDelay that use a monotonic clock. If you need to run jobs during a very specific time (cron-like), then we don't have anything built-in at the moment. There was a monix/monix#885 but it wasn't finished

BTW we're experimenting with discord (invite link: https://discord.com/invite/wsVZSEx4Nw) which is a bit more active, it might be a good idea to repeat some questions there

@umbreak It is not yet compatible and it's going to take some time. We are working on it on series/4.x branch
Didac
@umbreak
thanks for the feedback @Avasil
Kamil Kloch
@kamilkloch
hello all, is there a way to lock a Task into a specific scheduler? (akin to ZIO.lock)
Kamil Kloch
@kamilkloch

hello all, is there a way to lock a Task into a specific scheduler? (akin to ZIO.lock)

Task().executeOn?

Piotr Gawryś
@Avasil
yes
Kamil Kloch
@kamilkloch

yes

thanks! :)

dinosaur
@dinosaur-lin
@rohan-sircar I think it worked out perfectly. I don't see any issue. @Avasil @oleg-py @alexandru about the switchMap deadlock, no idea why it is causing that, mostly likely failed to cancel previous child observable (the child observable used to be cold, the code to unscribe from external source may not be done cleanly), but we are able to rewrite the code without switchMap . the app has been running in prod for a week. no more application hang, so it seems the issue is solved. BTW, I dig a bit into monix code, now I understand, uncaught exception report forNonFatal exceptions is logged everywhere, but it shouldn't kill the threads. Our application exceptions are definitely NonFatal...
Trần Tiến Đức
@trantienduchn
Hi, is there a feature that allow an Observable can retryIf/retryWhen + maxRetries + exponentialBackOff ?
Jasper Moeys
@Jasper-M
Subject is defined with separate in I and out O type parameters. Is there a way to use this? E.g. a way to create a Subject[Int, String]?
Piotr Gawryś
@Avasil
@Jasper-M Yes, but it requires cats syntax ( import cats.syntax.profunctor._ ). Check Profunctor methods like dimap, lmap, rmap. We should add something on Subject directly
Jasper Moeys
@Jasper-M
Ah thanks. I didn't think to look in there
Frédéric Cabestre
@fcabestre

Hello everybody,

I'm trying to port an MQTT client library based on fs2 to cats effect 3. I'm just at the beginning on the journey, carefully following the cats effect migration guide. One of the first steps is to update dependencies and check for eviction errors. I use monix in an example to show how to write a simple client with this library yet demonstrating compatibility with monix and zio.

And here is my issue. I get the following eviction check error while I though (but I may be wrong) that monix 3.4.0 compatible with cats effect 3.x.

[error] (examples / evicted) found version conflict(s) in library dependencies; some are suspected to be binary incompatible:
[error] 
[error]     * org.typelevel:cats-effect_2.12:3.1.1 (early-semver) is selected over {3.1.0, 2.5.1}
[error]         +- com.comcast:ip4s-core_2.12:3.0.3                   (depends on 3.1.1)
[error]         +- net.sigusr:fs2-mqtt_2.12:0.5.1+2-eef90374+20210624-1446-SNAPSHOT (depends on 3.1.1)
[error]         +- co.fs2:fs2-core_2.12:3.0.4                         (depends on 3.1.1)
[error]         +- io.monix:monix-catnap_2.12:3.4.0                   (depends on 2.5.1)
[error]         +- com.github.cb372:cats-retry_2.12:3.0.0             (depends on 3.1.0)

Could someone tell me where I'm wrong please.

Oleg Pyzhcov
@oleg-py
@fcabestre no modules of monix are yet available for cats-effect-3. I did some porting for catnap though, you might be able to "borrow" what you need from that PR if you're in a rush: monix/monix#1390
sadly cats-effect is quite pervasive in monix modules that are not monix-execution, and the API changes are very breaking
Frédéric Cabestre
@fcabestre
Nice, thank you for your help @oleg-py . So I had completely misunderstood availability of monix for cats-effect 3, my bad.
dinosaur
@dinosaur-lin
Hi guys, I modified monix's example code for Observable.create. Something puzzled me:
```scala
package test

import cats.effect.ExitCase
import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.OverflowStrategy.Unbounded
import monix.reactive.{Consumer, Observable}

import scala.concurrent.duration._
import monix.eval.Task
import monix.execution.Ack
import monix.reactive.Observable
import monix.reactive.OverflowStrategy
import monix.reactive.observers.Subscriber
import scala.concurrent.duration._

object TestApp1 {

  def main(args: Array[String]): Unit = {

    implicit val s = Scheduler.forkJoin(2, 5)
    def producerLoop(sub: Subscriber[Int], n: Int = 0): Task[Unit] = {
      Task.deferFuture(sub.onNext(n))
        .delayExecution(100.millis)
        .flatMap {
          case Ack.Continue =>
            println(s"producer to produce ${n + 1}")
            producerLoop(sub, n + 1)
          case Ack.Stop =>
            println(s"producer is stopped at $n")
            Task { println(s"producer is stopped at $n ")}
        }
    }

    val source: Observable[Int] =
      Observable.create(OverflowStrategy.Unbounded) { sub =>
        producerLoop(sub)
          .guarantee(Task(println("Producer has been completed")))
          .runToFuture(sub.scheduler)
      }

    val cancel = source.dump("O").subscribe()
    Task.eval( cancel.cancel() ).delayExecution(1000.millis).runToFuture
    s.awaitTermination(1000.minutes)
  }
}
0: O --> 0
producer to produce 1
producer to produce 2
1: O --> 1
producer to produce 3
2: O --> 2
producer to produce 4
3: O --> 3
producer to produce 5
4: O --> 4
producer to produce 6
5: O --> 5
producer to produce 7
6: O --> 6
producer to produce 8
7: O --> 7
producer to produce 9
8: O --> 8
Producer has been completed
10: O canceled
The output doesn't have producer is stopped at n. Any reason why? I think our code base use this trick trying to clean up some resource when the observable is cancelled, but it seems that it is never called...
Oleg Pyzhcov
@oleg-py
@dinosaur-lin If the task gets cancelled during delayExecution or deferFuture part, the flatMap won't ever execute. You need to use bracket and handle cancellation
of course it'd be best if you can use a higher-level something instead
Jasper Moeys
@Jasper-M
If I have a val subject = ConcurrentSubject.publish[Foo] and I never call subject.onComplete(), does that leak resources?
Jasper Moeys
@Jasper-M
Say there is no reference left to the object of which subject is a member, and there are no subscribers left listening to subject. Will subject and all its internal bookkeeping just be garbage collected eventually?
dinosaur
@dinosaur-lin

@oleg-py thanks. our use case is to convert Java call back event driven api to observable. or Akka style of code to observable. The code looks like following. I think the challenge here is the code to create the resource sub are coupled with the code to use resource: either in the event handler sub.onNext or the receive function of akka, subject.onNext. any idea how to rewrite this using higher level observable APIs so that the unsub or context.stop actor can be called reliably?

Observable.create(overflowStrategy) { sub => {
    subManager.sub(address, new EventHandler(
     def onEventUpdate(e: Event): Unit = {
       sub.onNext(e.data) match {
       case Continue => 
       case Stop => 
            subManager.unsub(address)
  }
})
}

or Akka code:

Observable.create(overflowStrategy) { subject => {
   val promise = Promise[Boolean]()
   val actor = context.actorOf(Props(new Actor {
     sub(address)
     override def receive: Receive  = {
      case d: data => 
        subject.onNext(d) match {
          case Continue => 
          case Stop => 
            promise.complete(result = scala.util.Success(true))
        }
   }
}
promise.future.onComplete(_ => {
        context.stop(actor)
      })
}
Rohan Sircar
@rohan-sircar

@Jasper-M

If I have a val subject = ConcurrentSubject.publish[Foo] and I never call subject.onComplete(), does that leak resources?

afaict, yes

@dinosaur-lin you could use a SingleAssignCancellable that sends a stop message to the actor
Rohan Sircar
@rohan-sircar
ime doing it the other way, having a queue as a field in the actor and getting an observable out of it was preferable, but that depends on the specifics of your problem at hand
Ciara O'Brien
@CiaraOBrien
Is there as of yet any vague indication of when some degree of CE3 compatibility might be achieved?
I've been putting off just doing what I need without Monix's help for a while now, in the vain hope that something might turn up
It's probably time to just start implementing a temporary solution and then go back to it later whenever Monix catches up :(
Piotr Gawryś
@Avasil
@CiaraOBrien No, there hasn't been much progress lately so don't let it block you. What do you need from Monix?
Ciara O'Brien
@CiaraOBrien
Well, for it to be compatible with Cats-Effect 3 lol
because I've got a project that's caught between a rock and a hard place, because I already wrote a bunch of it with CE3 but now it would be helpful to have Monix
but alas
Vladimir Bragin
@techlook_gitlab
Hi, dudes) I found that monix could be very useful for me and tried to use it on enormous streams of data. But when I applied function groupBy on the Observable, I found out it groups data over all streamed data and it isn't desirable cuz stream emits gigabytes of sorted for grouping data and thereby the groupBy consumes tremendous memory. Are there sliding (moving) grouping analogies that group ongoing sorted objects in stream instead of to process all big array at once? There is bufferWhile but it doezn't supply previous value in stream therefore it need to be stored in outside variable - it's wrong solution in functional programming.
Jasper Moeys
@Jasper-M
With scan you should fairly easily be able to implement a combinator like zipWithPrevious or whatever you are trying to do
Vladimir Bragin
@techlook_gitlab
@Jasper-M thanks. If anyone will need it, this is moving grouping (by analogy with a moving average):
def movingGroupBy[T](observableSource: Observable[T])(by: (T, T) => Boolean): Observable[Seq[T]] = {
    object Default {
      var value: T = _
    }

    val initialValue = Default.value

    observableSource.scan((initialValue, initialValue)) { case ((previous, _), current) =>
      current -> previous
    }
      .bufferWhile { case (current, previous) =>
        by(current, previous)
      }
      .map {
        _.map {
          case (current, _) => current
        }
      }
  }
Ciara O'Brien
@CiaraOBrien
Oooh, wire-signals just hit 1.0.0 the other day, maybe I'll give that a shot as a temporary standin for Monix stuff, just to bridge the gap between a push-based listener-based interface on one (Java) executor and the whole cats-effect circus on another unrelated executor
eventually I intend to have the whole code path from listeners to responses be implemented with monix reactive stuff, but for now I'll be happy with just a slightly smoother collision with the leviathan that is cats-effect's execution environment
and wire-signals might do that for me