Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Jasper Moeys
@Jasper-M
Say there is no reference left to the object of which subject is a member, and there are no subscribers left listening to subject. Will subject and all its internal bookkeeping just be garbage collected eventually?
dinosaur
@dinosaur-lin

@oleg-py thanks. our use case is to convert Java call back event driven api to observable. or Akka style of code to observable. The code looks like following. I think the challenge here is the code to create the resource sub are coupled with the code to use resource: either in the event handler sub.onNext or the receive function of akka, subject.onNext. any idea how to rewrite this using higher level observable APIs so that the unsub or context.stop actor can be called reliably?

Observable.create(overflowStrategy) { sub => {
    subManager.sub(address, new EventHandler(
     def onEventUpdate(e: Event): Unit = {
       sub.onNext(e.data) match {
       case Continue => 
       case Stop => 
            subManager.unsub(address)
  }
})
}

or Akka code:

Observable.create(overflowStrategy) { subject => {
   val promise = Promise[Boolean]()
   val actor = context.actorOf(Props(new Actor {
     sub(address)
     override def receive: Receive  = {
      case d: data => 
        subject.onNext(d) match {
          case Continue => 
          case Stop => 
            promise.complete(result = scala.util.Success(true))
        }
   }
}
promise.future.onComplete(_ => {
        context.stop(actor)
      })
}
Rohan Sircar
@rohan-sircar

@Jasper-M

If I have a val subject = ConcurrentSubject.publish[Foo] and I never call subject.onComplete(), does that leak resources?

afaict, yes

@dinosaur-lin you could use a SingleAssignCancellable that sends a stop message to the actor
Rohan Sircar
@rohan-sircar
ime doing it the other way, having a queue as a field in the actor and getting an observable out of it was preferable, but that depends on the specifics of your problem at hand
Ciara O'Brien
@CiaraOBrien
Is there as of yet any vague indication of when some degree of CE3 compatibility might be achieved?
I've been putting off just doing what I need without Monix's help for a while now, in the vain hope that something might turn up
It's probably time to just start implementing a temporary solution and then go back to it later whenever Monix catches up :(
Piotr Gawryś
@Avasil
@CiaraOBrien No, there hasn't been much progress lately so don't let it block you. What do you need from Monix?
Ciara O'Brien
@CiaraOBrien
Well, for it to be compatible with Cats-Effect 3 lol
because I've got a project that's caught between a rock and a hard place, because I already wrote a bunch of it with CE3 but now it would be helpful to have Monix
but alas
Vladimir Bragin
@techlook_gitlab
Hi, dudes) I found that monix could be very useful for me and tried to use it on enormous streams of data. But when I applied function groupBy on the Observable, I found out it groups data over all streamed data and it isn't desirable cuz stream emits gigabytes of sorted for grouping data and thereby the groupBy consumes tremendous memory. Are there sliding (moving) grouping analogies that group ongoing sorted objects in stream instead of to process all big array at once? There is bufferWhile but it doezn't supply previous value in stream therefore it need to be stored in outside variable - it's wrong solution in functional programming.
Jasper Moeys
@Jasper-M
With scan you should fairly easily be able to implement a combinator like zipWithPrevious or whatever you are trying to do
Vladimir Bragin
@techlook_gitlab
@Jasper-M thanks. If anyone will need it, this is moving grouping (by analogy with a moving average):
def movingGroupBy[T](observableSource: Observable[T])(by: (T, T) => Boolean): Observable[Seq[T]] = {
    object Default {
      var value: T = _
    }

    val initialValue = Default.value

    observableSource.scan((initialValue, initialValue)) { case ((previous, _), current) =>
      current -> previous
    }
      .bufferWhile { case (current, previous) =>
        by(current, previous)
      }
      .map {
        _.map {
          case (current, _) => current
        }
      }
  }
Ciara O'Brien
@CiaraOBrien
Oooh, wire-signals just hit 1.0.0 the other day, maybe I'll give that a shot as a temporary standin for Monix stuff, just to bridge the gap between a push-based listener-based interface on one (Java) executor and the whole cats-effect circus on another unrelated executor
eventually I intend to have the whole code path from listeners to responses be implemented with monix reactive stuff, but for now I'll be happy with just a slightly smoother collision with the leviathan that is cats-effect's execution environment
and wire-signals might do that for me
Matthew de Detrich
@mdedetrich
Does anyone know if there are utility/conversion methods to between Observer and java.io.OutputStream or java.io.Writer?
Both are essentially representing the same thing but I am dealing with a Java API that only works with OutputStream/Writer
Roman Janusz
@ghik
I have once written a thing that converts an Observable[T] into an Iterator[T] (obv by introducing blocking)
Trần Tiến Đức
@trantienduchn
I'm using monix-kafka, does anybody know how to close the Java Consumer/Producer inside?
Piotr Gawryś
@Avasil
@trantienduchn KafkaProducer has close method, KafkaProducerSink will close underlying Producer on stream completion/error. Same with KafkaConsumerObservable. Cancelling subscription will work too. You can also use builder methods that allow you to pass your own Consumer/Producer
Joseph Denman
@JosephDenman

Hi all. I have an SBT question. I'm trying to update my libraries to their latest versions, compatible with Scala 2.12.14. I keep seeing this weird error:

[error] stack trace is suppressed; run last core / update for the full output [error] (core / update) found version conflict(s) in library dependencies; some are suspected to be binary incompatible: [error] [error] * org.typelevel:cats-effect_2.12:3.1.0 (early-semver) is selected over 2.5.1 [error] +- midnight-core:midnight-core_2.12:v0.18.1 (depends on 3.1.0) [error] +- io.monix:monix-catnap_2.12:3.4.0 (depends on 2.5.1) [error] [error] [error] this can be overridden using libraryDependencySchemes or evictionErrorLevel [error] Total time: 1 s, completed Jul 16, 2021, 5:32:58 PM

I don't explicitly have monix-catnap as a dependency, and I don't see why it should prefer cats-effect-2.5.1 over 3.1.0. Any ideas? I'm using SBT 1.5.

Piotr Gawryś
@Avasil
@JosephDenman Monix (apart from monix-execution) is not yet compatible with Cats-Effect 3.x
Other Monix modules depend on monix-catnap so that's probably why it's showing up
Arunav Sanyal
@Khalian
hello. I am trying to solve this problem https://stackoverflow.com/questions/68538818/error-handling-on-monix-parallel-tasks-using-parmap and I would appreciate some inputs, hints, help of any sort
Rohan Sircar
@rohan-sircar
@Khalian it's a faq. If you call attempt on the tasks, that will prevent the short-circuiting behavior of parXX, letting you deal with the error values as you see fit.
Arunav Sanyal
@Khalian
awesome, yea that definitely helps. now i get two Eithers and i have the Throwables in Context. Thanks @rohan-sircar
Jasper Moeys
@Jasper-M
@Khalian It could be helpful if you answered your own question on stackoverflow with the solution that you found
Simão Mata
@simao
Hi. Quick question about https://monix.io/docs/current/best-practices/blocking.html Why still use blocking in the last two examples? If we are using a custom/separate thread pool for IO, is a ThreadFactory created like that also aware of scala's blocking ? Or is the example with blocking just valid for when using monix's Scheduler.io ?
Joseph Denman
@JosephDenman
Does anyone know when Monix compatible with Cats Effect 3 is due to be released?
Arunav Sanyal
@Khalian
I have a question about Monix timeouts. I wrote in a timeout for a task to 10 seconds. It works fine at "low scale" but when I have hundreds of thousands of tasks consuming most of my resources (notably CPU clocking close to 90+ percent), the timeout is not honored (it jumps upwards to values like 15 seconds). Is there some way to "aggresively" timeout from Monix.
elyphas
@elyphas

Is it correct to make multiple mapping over an observable and subscribe on these results?
Something like this:

val observable = ...
val obs1 = observable.map { ...
val obs2 = observable.map { ...
val obs3 = observable.map { ...

obs1.subscribe(observer1)
obs2.subscribe(observer2)
obs3.subscribe(observer3)

I have trouble to manage the results because doesn't trigger all of these, sometimes trigger one sometimes trigger another, :).

my code is this: code

elyphas
@elyphas
Well seems that make a multiple subscribe is triggering a request many times and is an error.

oh seems that this:

val obs1 = observable.map { ...

make observable send event multiple times and not is what I want.
I I want to send 1 event and subscribe with multiple observer.

Rohan Sircar
@rohan-sircar
Make the source hot then, using publish and all
elyphas
@elyphas
@rohan-sircar , thank you
Rohan Sircar
@rohan-sircar
@Khalian as i understand, your monix scheduling thread is getting starved.
Arunav Sanyal
@Khalian
@rohan-sircar Yea it basically is.
I am considering a completely different approach to solving my problem that does not involve monix timeouts. The timeout helps my system immensely, but not predictably so. And I fear that even if I tune this approach, at sufficient enough scale, there would always be starvation to prevent monix from doing its job predictably so
elyphas
@elyphas
@rohan-sircar , thank you it works
nrktkt
@nrktkt:matrix.org
[m]

I'm trying to get http4s going with monix, but running into this compiler error when trying to make some simple http routes

[error] Symbol 'type cats.effect.Effect' is missing from the classpath.
[error] This symbol is required by 'class monix.eval.instances.CatsEffectForTask'.
[error] Make sure that type Effect is in your classpath and check for conflicting dependencies with `-Ylog-classpath`.
[error] A full rebuild may help if 'CatsEffectForTask.class' was compiled against an incompatible version of cats.effect.
[error]   val routes = HttpRoutes.of[Task] {
[error]                                    ^

are there any known incompatibilities between monix and http4s? they're both on cats 2 afaik

I'm on monix 3.4 and http4s 0.23
Jasper Moeys
@Jasper-M
@nrktkt:matrix.org Actually http4s 0.23 appears to be on cats-effect 3
Alec Zorab
@AlecZorab
is there an easy way to do Observable[T] => Resource[Task, T]? I'm probably being stupid, but I can't find one
Pau Alarcón
@paualarco
Generally it is used the opposite way Resource[Task, T] => Observable[T], so you initially define the resource that is going to be used for the observable until it completes
Alec Zorab
@AlecZorab
yes, but if I'm trying to conform to someone else's interface, I don't necessarily have that choice
I think I need to do Resource.eval(o.lastL) or something on those lines and just live with the fact it's a bit crap
Pau Alarcón
@paualarco
That would work... I guess that's an edge scenario :D
Alec Zorab
@AlecZorab
I mean perhaps in practice that's actually fine and does exactly what I need, and given that Resource[F, T] represents a single T, there's maybe not much else you can do