Observable[T]
into an Iterator[T]
(obv by introducing blocking)
KafkaProducer
has close
method, KafkaProducerSink
will close underlying Producer on stream completion/error. Same with KafkaConsumerObservable
. Cancelling subscription will work too. You can also use builder methods that allow you to pass your own Consumer/Producer
Hi all. I have an SBT question. I'm trying to update my libraries to their latest versions, compatible with Scala 2.12.14. I keep seeing this weird error:
[error] stack trace is suppressed; run last core / update for the full output
[error] (core / update) found version conflict(s) in library dependencies; some are suspected to be binary incompatible:
[error]
[error] * org.typelevel:cats-effect_2.12:3.1.0 (early-semver) is selected over 2.5.1
[error] +- midnight-core:midnight-core_2.12:v0.18.1 (depends on 3.1.0)
[error] +- io.monix:monix-catnap_2.12:3.4.0 (depends on 2.5.1)
[error]
[error]
[error] this can be overridden using libraryDependencySchemes or evictionErrorLevel
[error] Total time: 1 s, completed Jul 16, 2021, 5:32:58 PM
I don't explicitly have monix-catnap
as a dependency, and I don't see why it should prefer cats-effect-2.5.1
over 3.1.0
. Any ideas? I'm using SBT 1.5.
blocking
in the last two examples? If we are using a custom/separate thread pool for IO, is a ThreadFactory
created like that also aware of scala's blocking
? Or is the example with blocking
just valid for when using monix's Scheduler.io
?
Is it correct to make multiple mapping over an observable and subscribe on these results?
Something like this:
val observable = ...
val obs1 = observable.map { ...
val obs2 = observable.map { ...
val obs3 = observable.map { ...
obs1.subscribe(observer1)
obs2.subscribe(observer2)
obs3.subscribe(observer3)
I have trouble to manage the results because doesn't trigger all of these, sometimes trigger one sometimes trigger another, :).
my code is this: code
oh seems that this:
val obs1 = observable.map { ...
make observable send event multiple times and not is what I want.
I I want to send 1 event and subscribe with multiple observer.
I'm trying to get http4s going with monix, but running into this compiler error when trying to make some simple http routes
[error] Symbol 'type cats.effect.Effect' is missing from the classpath.
[error] This symbol is required by 'class monix.eval.instances.CatsEffectForTask'.
[error] Make sure that type Effect is in your classpath and check for conflicting dependencies with `-Ylog-classpath`.
[error] A full rebuild may help if 'CatsEffectForTask.class' was compiled against an incompatible version of cats.effect.
[error] val routes = HttpRoutes.of[Task] {
[error] ^
are there any known incompatibilities between monix and http4s? they're both on cats 2 afaik
Resource.eval(o.lastL)
or something on those lines and just live with the fact it's a bit crap
filter
step?case class
and if it is valid then send to the observer
val validatedMainItem = hdlMainItem
.filter { x =>
Inputs.validate(x) match {
case Valid(valid) =>
true
case Invalid(errors) =>
val err = errors.foldLeft(""){ case (acc, next) => acc + "\n" + next }.mkString("\n")
alert(err)
false
}
}.map { x =>
Inputs.validate(x) match {
case Valid(valid) => valid
case Invalid(error) => Inputs()
}
}
I mean perhaps in practice that's actually fine and does exactly what I need, and given that
Resource[F, T]
represents a singleT
, there's maybe not much else you can do
I wonder if there's potential for a subtype of Observable[T]
where the grammar is onNext? (onComplete | onError)?
, ie you'll get at most one T
. There's quite a few places in my code where I really want a "Task-with-cleanup", and either I have to use Resource
, which is clunky or Observable
which doesn't assert to consumers that there's only one element
Resource
instead :D
fromResource
relies on the backpressure after onNext
, so it's reliant on monix-specific divergence from Rx family of libraries in that internal protocol. It's a thing you can do, but it falls apart in some fairly innocuous scenarios and a lot of interop ones - and Alex argues having it is a mistake (see #1427)
hi, this is right?
val obsMainItem =
publishOnEventDocument
//.dump("En el obsMainItem ************")
.filter {
case Right(value@_) => true
case Left(err@_) => false
}
.map {
case Right((oldItem, newItem, event)) =>
event match {
case NotFoundDoobie(msg) => oldItem
case _ => newItem
}
case _ => ViewProcess() //Here never must be reach
}
The Left(err)
side is for another observable where I handle the errors.
val obsProcessSideEffects =
publishOnEventDocument
//.dump("Changos en el processSideEffects **************")
.map {
case Right((oldItem@_, newItem@_, event)) =>
event match {
case FoundDoobie(x) => div(x)
case SavedDoobie(x) => div(x)
case DeletedDoobie(x) => div(x)
case NotFoundDoobie(x) => div(x)
case ErrorDoobie(x) => div(x)
}
case Left(error) => error
}