Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Roman Janusz
@ghik
I have once written a thing that converts an Observable[T] into an Iterator[T] (obv by introducing blocking)
Trần Tiến Đức
@trantienduchn
I'm using monix-kafka, does anybody know how to close the Java Consumer/Producer inside?
Piotr Gawryś
@Avasil
@trantienduchn KafkaProducer has close method, KafkaProducerSink will close underlying Producer on stream completion/error. Same with KafkaConsumerObservable. Cancelling subscription will work too. You can also use builder methods that allow you to pass your own Consumer/Producer
Joseph Denman
@JosephDenman

Hi all. I have an SBT question. I'm trying to update my libraries to their latest versions, compatible with Scala 2.12.14. I keep seeing this weird error:

[error] stack trace is suppressed; run last core / update for the full output [error] (core / update) found version conflict(s) in library dependencies; some are suspected to be binary incompatible: [error] [error] * org.typelevel:cats-effect_2.12:3.1.0 (early-semver) is selected over 2.5.1 [error] +- midnight-core:midnight-core_2.12:v0.18.1 (depends on 3.1.0) [error] +- io.monix:monix-catnap_2.12:3.4.0 (depends on 2.5.1) [error] [error] [error] this can be overridden using libraryDependencySchemes or evictionErrorLevel [error] Total time: 1 s, completed Jul 16, 2021, 5:32:58 PM

I don't explicitly have monix-catnap as a dependency, and I don't see why it should prefer cats-effect-2.5.1 over 3.1.0. Any ideas? I'm using SBT 1.5.

Piotr Gawryś
@Avasil
@JosephDenman Monix (apart from monix-execution) is not yet compatible with Cats-Effect 3.x
Other Monix modules depend on monix-catnap so that's probably why it's showing up
Arunav Sanyal
@Khalian
hello. I am trying to solve this problem https://stackoverflow.com/questions/68538818/error-handling-on-monix-parallel-tasks-using-parmap and I would appreciate some inputs, hints, help of any sort
Rohan Sircar
@rohan-sircar
@Khalian it's a faq. If you call attempt on the tasks, that will prevent the short-circuiting behavior of parXX, letting you deal with the error values as you see fit.
Arunav Sanyal
@Khalian
awesome, yea that definitely helps. now i get two Eithers and i have the Throwables in Context. Thanks @rohan-sircar
Jasper Moeys
@Jasper-M
@Khalian It could be helpful if you answered your own question on stackoverflow with the solution that you found
Simão Mata
@simao
Hi. Quick question about https://monix.io/docs/current/best-practices/blocking.html Why still use blocking in the last two examples? If we are using a custom/separate thread pool for IO, is a ThreadFactory created like that also aware of scala's blocking ? Or is the example with blocking just valid for when using monix's Scheduler.io ?
Joseph Denman
@JosephDenman
Does anyone know when Monix compatible with Cats Effect 3 is due to be released?
Arunav Sanyal
@Khalian
I have a question about Monix timeouts. I wrote in a timeout for a task to 10 seconds. It works fine at "low scale" but when I have hundreds of thousands of tasks consuming most of my resources (notably CPU clocking close to 90+ percent), the timeout is not honored (it jumps upwards to values like 15 seconds). Is there some way to "aggresively" timeout from Monix.
elyphas
@elyphas

Is it correct to make multiple mapping over an observable and subscribe on these results?
Something like this:

val observable = ...
val obs1 = observable.map { ...
val obs2 = observable.map { ...
val obs3 = observable.map { ...

obs1.subscribe(observer1)
obs2.subscribe(observer2)
obs3.subscribe(observer3)

I have trouble to manage the results because doesn't trigger all of these, sometimes trigger one sometimes trigger another, :).

my code is this: code

elyphas
@elyphas
Well seems that make a multiple subscribe is triggering a request many times and is an error.

oh seems that this:

val obs1 = observable.map { ...

make observable send event multiple times and not is what I want.
I I want to send 1 event and subscribe with multiple observer.

Rohan Sircar
@rohan-sircar
Make the source hot then, using publish and all
elyphas
@elyphas
@rohan-sircar , thank you
Rohan Sircar
@rohan-sircar
@Khalian as i understand, your monix scheduling thread is getting starved.
Arunav Sanyal
@Khalian
@rohan-sircar Yea it basically is.
I am considering a completely different approach to solving my problem that does not involve monix timeouts. The timeout helps my system immensely, but not predictably so. And I fear that even if I tune this approach, at sufficient enough scale, there would always be starvation to prevent monix from doing its job predictably so
elyphas
@elyphas
@rohan-sircar , thank you it works
nrktkt
@nrktkt:matrix.org
[m]

I'm trying to get http4s going with monix, but running into this compiler error when trying to make some simple http routes

[error] Symbol 'type cats.effect.Effect' is missing from the classpath.
[error] This symbol is required by 'class monix.eval.instances.CatsEffectForTask'.
[error] Make sure that type Effect is in your classpath and check for conflicting dependencies with `-Ylog-classpath`.
[error] A full rebuild may help if 'CatsEffectForTask.class' was compiled against an incompatible version of cats.effect.
[error]   val routes = HttpRoutes.of[Task] {
[error]                                    ^

are there any known incompatibilities between monix and http4s? they're both on cats 2 afaik

I'm on monix 3.4 and http4s 0.23
Jasper Moeys
@Jasper-M
@nrktkt:matrix.org Actually http4s 0.23 appears to be on cats-effect 3
Alec Zorab
@AlecZorab
is there an easy way to do Observable[T] => Resource[Task, T]? I'm probably being stupid, but I can't find one
Pau Alarcón
@paualarco
Generally it is used the opposite way Resource[Task, T] => Observable[T], so you initially define the resource that is going to be used for the observable until it completes
Alec Zorab
@AlecZorab
yes, but if I'm trying to conform to someone else's interface, I don't necessarily have that choice
I think I need to do Resource.eval(o.lastL) or something on those lines and just live with the fact it's a bit crap
Pau Alarcón
@paualarco
That would work... I guess that's an edge scenario :D
Alec Zorab
@AlecZorab
I mean perhaps in practice that's actually fine and does exactly what I need, and given that Resource[F, T] represents a single T, there's maybe not much else you can do
elyphas
@elyphas
Hi, is it right to do the validation process on the filter step?
I want to validate the fields of a case class and if it is valid then send to the observer
Something like this:
val validatedMainItem = hdlMainItem
                           .filter { x =>
                               Inputs.validate(x) match {
                                   case Valid(valid) =>
                                     true
                                   case Invalid(errors) =>
                                     val err = errors.foldLeft(""){ case (acc, next) => acc + "\n" + next }.mkString("\n")
                                     alert(err)
                                     false
                               }
                           }.map { x =>
                               Inputs.validate(x) match {
                                 case Valid(valid) => valid
                                 case Invalid(error) => Inputs()
                               }
                           }
3 replies
Volodymyr Barna
@RobertoUa
When monix for cats-effect 3 is going to be available?
Rohan Sircar
@rohan-sircar
We're all waiting for it, better get in line.
Alec Zorab
@AlecZorab

I mean perhaps in practice that's actually fine and does exactly what I need, and given that Resource[F, T] represents a single T, there's maybe not much else you can do

I wonder if there's potential for a subtype of Observable[T] where the grammar is onNext? (onComplete | onError)?, ie you'll get at most one T. There's quite a few places in my code where I really want a "Task-with-cleanup", and either I have to use Resource, which is clunky or Observable which doesn't assert to consumers that there's only one element

Oleg Pyzhcov
@oleg-py
That would be a bad case for the protocol. Remember that something else calls onComplete there, so it's not that you let go of cleanable thing, it'd be like third party telling you to wrap up your business.
I just got used to Resource instead :D
Alec Zorab
@AlecZorab
I don't understand how what I described is any different to the reality of the behaviour you get from Observable.fromResource?
Oleg Pyzhcov
@oleg-py
fromResource relies on the backpressure after onNext, so it's reliant on monix-specific divergence from Rx family of libraries in that internal protocol. It's a thing you can do, but it falls apart in some fairly innocuous scenarios and a lot of interop ones - and Alex argues having it is a mistake (see #1427)
Arunav Sanyal
@Khalian
is there a way to control configure how long a Task's memoizeOnSuccess actually memoizes a successful result. A coworker used this as a cache and may have messed up the timing (we are memoizing aws credentials but I suspect we memoized it for longer than its expiry time somehow)
Oleg Pyzhcov
@oleg-py
No. Once memoized, it's there for as long as you have a reference to that particular task
Arunav Sanyal
@Khalian
thanks
@oleg-py Thanks. I was expecting an answer like that
yea i think we just messed up our caching (instead ended up memoizing the results) and now we cannot reason about our code
Alec Zorab
@AlecZorab
anyone got any thoughts or prior art on implementing a headAndTail for observable that does Observable[A] => Task[(A, Observable[A])]? Would also accept the return type being Observable[(A, Observable[A])] ;)
elyphas
@elyphas

hi, this is right?

    val obsMainItem =
        publishOnEventDocument
            //.dump("En el obsMainItem ************")
            .filter {
                case Right(value@_) => true
                case Left(err@_) => false
            }
            .map {
                case Right((oldItem, newItem, event)) =>
                    event match {
                        case NotFoundDoobie(msg) => oldItem
                        case _ => newItem
                    }
                case _ => ViewProcess() //Here never must be reach
            }

The Left(err) side is for another observable where I handle the errors.

    val obsProcessSideEffects =
        publishOnEventDocument
            //.dump("Changos en el processSideEffects **************")
            .map {
                case Right((oldItem@_, newItem@_, event)) =>
                    event match {
                        case FoundDoobie(x) => div(x)
                        case SavedDoobie(x) => div(x)
                        case DeletedDoobie(x) => div(x)
                        case NotFoundDoobie(x) => div(x)
                        case ErrorDoobie(x) => div(x)
                    }
                case Left(error) => error
            }
Jasper Moeys
@Jasper-M
@AlecZorab Don't know if that counts as prior art but in fs2 you can have
def uncons[A](stream: Stream[Task, A]): Stream[Task, (A, Stream[Task, A])]
  stream.pull.uncons1.flatMap(Pull.output1).stream.unNone
Alec Zorab
@AlecZorab
yeah, I'd like that but in monix :laughing:
Jasper Moeys
@Jasper-M
Monix Iterant also has an uncons :p Don't know if that's coincidence or if pull based streams are inherently better suited for that operation
Alec Zorab
@AlecZorab
Is there a less clunky way to go from Task[Observable[A]] than t.to[Observable].flatten?
rms264
@rms264
Creating a long running task with Monix Kafka to listen to a list of topics. Should I use an IO scheduler for the KafkaConsumerObervable or a ForkJoinPool? Also should I have a separate scheduler for the consumer or just one and use it everywhere