Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Ciara O'Brien
@CiaraOBrien
Well, for it to be compatible with Cats-Effect 3 lol
because I've got a project that's caught between a rock and a hard place, because I already wrote a bunch of it with CE3 but now it would be helpful to have Monix
but alas
Vladimir Bragin
@techlook_gitlab
Hi, dudes) I found that monix could be very useful for me and tried to use it on enormous streams of data. But when I applied function groupBy on the Observable, I found out it groups data over all streamed data and it isn't desirable cuz stream emits gigabytes of sorted for grouping data and thereby the groupBy consumes tremendous memory. Are there sliding (moving) grouping analogies that group ongoing sorted objects in stream instead of to process all big array at once? There is bufferWhile but it doezn't supply previous value in stream therefore it need to be stored in outside variable - it's wrong solution in functional programming.
Jasper Moeys
@Jasper-M
With scan you should fairly easily be able to implement a combinator like zipWithPrevious or whatever you are trying to do
Vladimir Bragin
@techlook_gitlab
@Jasper-M thanks. If anyone will need it, this is moving grouping (by analogy with a moving average):
def movingGroupBy[T](observableSource: Observable[T])(by: (T, T) => Boolean): Observable[Seq[T]] = {
    object Default {
      var value: T = _
    }

    val initialValue = Default.value

    observableSource.scan((initialValue, initialValue)) { case ((previous, _), current) =>
      current -> previous
    }
      .bufferWhile { case (current, previous) =>
        by(current, previous)
      }
      .map {
        _.map {
          case (current, _) => current
        }
      }
  }
Ciara O'Brien
@CiaraOBrien
Oooh, wire-signals just hit 1.0.0 the other day, maybe I'll give that a shot as a temporary standin for Monix stuff, just to bridge the gap between a push-based listener-based interface on one (Java) executor and the whole cats-effect circus on another unrelated executor
eventually I intend to have the whole code path from listeners to responses be implemented with monix reactive stuff, but for now I'll be happy with just a slightly smoother collision with the leviathan that is cats-effect's execution environment
and wire-signals might do that for me
Matthew de Detrich
@mdedetrich
Does anyone know if there are utility/conversion methods to between Observer and java.io.OutputStream or java.io.Writer?
Both are essentially representing the same thing but I am dealing with a Java API that only works with OutputStream/Writer
Roman Janusz
@ghik
I have once written a thing that converts an Observable[T] into an Iterator[T] (obv by introducing blocking)
Trần Tiến Đức
@trantienduchn
I'm using monix-kafka, does anybody know how to close the Java Consumer/Producer inside?
Piotr Gawryś
@Avasil
@trantienduchn KafkaProducer has close method, KafkaProducerSink will close underlying Producer on stream completion/error. Same with KafkaConsumerObservable. Cancelling subscription will work too. You can also use builder methods that allow you to pass your own Consumer/Producer
Joseph Denman
@JosephDenman

Hi all. I have an SBT question. I'm trying to update my libraries to their latest versions, compatible with Scala 2.12.14. I keep seeing this weird error:

[error] stack trace is suppressed; run last core / update for the full output [error] (core / update) found version conflict(s) in library dependencies; some are suspected to be binary incompatible: [error] [error] * org.typelevel:cats-effect_2.12:3.1.0 (early-semver) is selected over 2.5.1 [error] +- midnight-core:midnight-core_2.12:v0.18.1 (depends on 3.1.0) [error] +- io.monix:monix-catnap_2.12:3.4.0 (depends on 2.5.1) [error] [error] [error] this can be overridden using libraryDependencySchemes or evictionErrorLevel [error] Total time: 1 s, completed Jul 16, 2021, 5:32:58 PM

I don't explicitly have monix-catnap as a dependency, and I don't see why it should prefer cats-effect-2.5.1 over 3.1.0. Any ideas? I'm using SBT 1.5.

Piotr Gawryś
@Avasil
@JosephDenman Monix (apart from monix-execution) is not yet compatible with Cats-Effect 3.x
Other Monix modules depend on monix-catnap so that's probably why it's showing up
Arunav Sanyal
@Khalian
hello. I am trying to solve this problem https://stackoverflow.com/questions/68538818/error-handling-on-monix-parallel-tasks-using-parmap and I would appreciate some inputs, hints, help of any sort
Rohan Sircar
@rohan-sircar
@Khalian it's a faq. If you call attempt on the tasks, that will prevent the short-circuiting behavior of parXX, letting you deal with the error values as you see fit.
Arunav Sanyal
@Khalian
awesome, yea that definitely helps. now i get two Eithers and i have the Throwables in Context. Thanks @rohan-sircar
Jasper Moeys
@Jasper-M
@Khalian It could be helpful if you answered your own question on stackoverflow with the solution that you found
Simão Mata
@simao
Hi. Quick question about https://monix.io/docs/current/best-practices/blocking.html Why still use blocking in the last two examples? If we are using a custom/separate thread pool for IO, is a ThreadFactory created like that also aware of scala's blocking ? Or is the example with blocking just valid for when using monix's Scheduler.io ?
Joseph Denman
@JosephDenman
Does anyone know when Monix compatible with Cats Effect 3 is due to be released?
Arunav Sanyal
@Khalian
I have a question about Monix timeouts. I wrote in a timeout for a task to 10 seconds. It works fine at "low scale" but when I have hundreds of thousands of tasks consuming most of my resources (notably CPU clocking close to 90+ percent), the timeout is not honored (it jumps upwards to values like 15 seconds). Is there some way to "aggresively" timeout from Monix.
elyphas
@elyphas

Is it correct to make multiple mapping over an observable and subscribe on these results?
Something like this:

val observable = ...
val obs1 = observable.map { ...
val obs2 = observable.map { ...
val obs3 = observable.map { ...

obs1.subscribe(observer1)
obs2.subscribe(observer2)
obs3.subscribe(observer3)

I have trouble to manage the results because doesn't trigger all of these, sometimes trigger one sometimes trigger another, :).

my code is this: code

elyphas
@elyphas
Well seems that make a multiple subscribe is triggering a request many times and is an error.

oh seems that this:

val obs1 = observable.map { ...

make observable send event multiple times and not is what I want.
I I want to send 1 event and subscribe with multiple observer.

Rohan Sircar
@rohan-sircar
Make the source hot then, using publish and all
elyphas
@elyphas
@rohan-sircar , thank you
Rohan Sircar
@rohan-sircar
@Khalian as i understand, your monix scheduling thread is getting starved.
Arunav Sanyal
@Khalian
@rohan-sircar Yea it basically is.
I am considering a completely different approach to solving my problem that does not involve monix timeouts. The timeout helps my system immensely, but not predictably so. And I fear that even if I tune this approach, at sufficient enough scale, there would always be starvation to prevent monix from doing its job predictably so
elyphas
@elyphas
@rohan-sircar , thank you it works
nrktkt
@nrktkt:matrix.org
[m]

I'm trying to get http4s going with monix, but running into this compiler error when trying to make some simple http routes

[error] Symbol 'type cats.effect.Effect' is missing from the classpath.
[error] This symbol is required by 'class monix.eval.instances.CatsEffectForTask'.
[error] Make sure that type Effect is in your classpath and check for conflicting dependencies with `-Ylog-classpath`.
[error] A full rebuild may help if 'CatsEffectForTask.class' was compiled against an incompatible version of cats.effect.
[error]   val routes = HttpRoutes.of[Task] {
[error]                                    ^

are there any known incompatibilities between monix and http4s? they're both on cats 2 afaik

I'm on monix 3.4 and http4s 0.23
Jasper Moeys
@Jasper-M
@nrktkt:matrix.org Actually http4s 0.23 appears to be on cats-effect 3
Alec Zorab
@AlecZorab
is there an easy way to do Observable[T] => Resource[Task, T]? I'm probably being stupid, but I can't find one
Pau Alarcón
@paualarco
Generally it is used the opposite way Resource[Task, T] => Observable[T], so you initially define the resource that is going to be used for the observable until it completes
Alec Zorab
@AlecZorab
yes, but if I'm trying to conform to someone else's interface, I don't necessarily have that choice
I think I need to do Resource.eval(o.lastL) or something on those lines and just live with the fact it's a bit crap
Pau Alarcón
@paualarco
That would work... I guess that's an edge scenario :D
Alec Zorab
@AlecZorab
I mean perhaps in practice that's actually fine and does exactly what I need, and given that Resource[F, T] represents a single T, there's maybe not much else you can do
elyphas
@elyphas
Hi, is it right to do the validation process on the filter step?
I want to validate the fields of a case class and if it is valid then send to the observer
Something like this:
val validatedMainItem = hdlMainItem
                           .filter { x =>
                               Inputs.validate(x) match {
                                   case Valid(valid) =>
                                     true
                                   case Invalid(errors) =>
                                     val err = errors.foldLeft(""){ case (acc, next) => acc + "\n" + next }.mkString("\n")
                                     alert(err)
                                     false
                               }
                           }.map { x =>
                               Inputs.validate(x) match {
                                 case Valid(valid) => valid
                                 case Invalid(error) => Inputs()
                               }
                           }
3 replies
Volodymyr Barna
@RobertoUa
When monix for cats-effect 3 is going to be available?
Rohan Sircar
@rohan-sircar
We're all waiting for it, better get in line.
Alec Zorab
@AlecZorab

I mean perhaps in practice that's actually fine and does exactly what I need, and given that Resource[F, T] represents a single T, there's maybe not much else you can do

I wonder if there's potential for a subtype of Observable[T] where the grammar is onNext? (onComplete | onError)?, ie you'll get at most one T. There's quite a few places in my code where I really want a "Task-with-cleanup", and either I have to use Resource, which is clunky or Observable which doesn't assert to consumers that there's only one element

Oleg Pyzhcov
@oleg-py
That would be a bad case for the protocol. Remember that something else calls onComplete there, so it's not that you let go of cleanable thing, it'd be like third party telling you to wrap up your business.
I just got used to Resource instead :D
Alec Zorab
@AlecZorab
I don't understand how what I described is any different to the reality of the behaviour you get from Observable.fromResource?
Oleg Pyzhcov
@oleg-py
fromResource relies on the backpressure after onNext, so it's reliant on monix-specific divergence from Rx family of libraries in that internal protocol. It's a thing you can do, but it falls apart in some fairly innocuous scenarios and a lot of interop ones - and Alex argues having it is a mistake (see #1427)
Arunav Sanyal
@Khalian
is there a way to control configure how long a Task's memoizeOnSuccess actually memoizes a successful result. A coworker used this as a cache and may have messed up the timing (we are memoizing aws credentials but I suspect we memoized it for longer than its expiry time somehow)