Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
Pau Alarcón

Does Monix work with Cats Effect 3?

Not yet I am afraid

Henri Cook
Funnily enough I was just looking for an issue tracking this - is there anywhere I could check in on regularly to see how Cats Effect 3 work is going?
Piotr Gawryś

The most up to date info can be found in Typevel discord, Alex is active there: https://discord.gg/hWd4eS244g
He answered a question about the CE3 roadmap recently:

Good question. I'm trying to come up with one and it is ... complicated, because it needs to have an identity that's different from Cats-Effect & fs2.
The value of Monix is in its pragmatism, its approach to FP that allows for impure interfaces to live alongside the pure ones (with established conventions), its deep integration with the host, its coherence, its fairly stable backwards compatibility.
Unfortunately, the task at hand on deciding what to do next, isn't easy to solve, and personally I've been away, on what people call an OSS hiatus. Trying to get back on that ride though.
In other news, in private I'm experimenting with a new Task. I have ideas and I'm foreseeing a philosophical divergence from cats.effect.IO. If CE evolves towards Task, the question would have been if Monix's implementation makes any sense, but CE 3 has actually moved further from Task's philosophy, and now I'm thinking Task can continue to provide value, while integrating CE3 ideas and also implement its type-classes.
Anyway, I was thinking of getting together with Monix contributors, discuss current issues, decide on a road-map, and call for help maybe, as there's plenty to work on.
This does mean that we are nowhere near a new major release. Sorry for not being able to give you a better answer.

Yannick Gladow
Hey maybe you can help me, let's say I want to fetch some data every seconds and then process it afterwards.
And I need to make sure that everything I fetch is 100% processed and never dropped, even when the application is shutdown.
What's the best way to make this work?
Right now I am thinking of making it uncancelable and then in the doOnCancel of the Task somehow stopping further fetches, sending a Stop signal upstream, but upstream from the processing part, as that needs to happen.
      .flatMap(a => Observable.fromTask(Task.delay(println(a)).as(a))) // Fetching something from API, once fetched we need to ensure it is processed
      .mapEval(a => Task.sleep(2.seconds).as(a))
      .consumeWith(Consumer.foreachParallelTask[Long](1)(a => Task.delay(println(s"arrived at the end $a")))) // This step always HAS to be invoked
      .doOnCancel(Task.sleep(10.seconds) *> Task.delay(println("CIAO")))

Hi, someone could tell me, please.
Why this code isn't working properly?

I want to request on different links, when the Observable triggers an event.

val eventComparative = PublishSubject[String]()
val eventComparativePublish = eventComparative.publish
val cancelEventComparativePublish = eventComparativePublish.connect()

val getMinimosRetensionISR = eventComparativePublish.dump("0").mapEval { case idComparative => ...
val getProcess =  eventComparativePublish.dump("0").mapEval { case idComparative => ...
val getComparativo =  eventComparativePublish.dump("0").mapEval { case idComparative => ...
it is collected this way:
val observableZip3 = Observable.zip3(getMinimosRetensionISR, getProcess, getComparativo)
Pau Alarcón
Perhaps you could share a bit more about your use case? In that snippet I don't see events being emitted
@paualarco ; sorry, I will
  val cmdPrint = button("Imprimir", cls := "myButton",
                      SyncIO ( onClick.transformLifted { event: Observable[MouseEvent] =>
                              .withLatestFrom(getIdComparative) {
                                case (event@_, idComparative: String) =>
                          }} --> repoCompara.eventComparative
and thank you very much ! :-)
this is the full code where the PublishSubject live: Repository
Pau Alarcón
The repository you've shared is not complete as it has external dependencies, also looks complex to understand. Also don't know what's the problem hehe
See a similar small example to the first you shared, but in this case it shows a bit more how to run the program and emit events to the publisher:
  val publisher = PublishSubject[String]().publish
  val cancelEventComparativePublish = publisher.connect()
  val o1 = eventComparativePublish.dump("Event received!")
  val publishTask = Observable.repeatEvalF(Task(eventComparative.onNext("Hi")))
  Observable.zip2(o1, publishTask).completedL.runSyncUnsafe()
@paualarco , sorry,
Pau Alarcón
Hope this is helpful
@paualarco; ah thank you, I will try
@paualarco , it works thank you, :-)
Trần Tiến Đức
Did any one got a problem of missing/mixed/incorrect stacktraces? In my application, I use chains of Observables mixed with Tasks with alot of parMap2, 3, 4, 5. So usually, if any error happens, the stack traces doesn't report fully where the error happend, or pointing to the wrong lines from another source code file, those lines usually point to Observale. firstL or consumeWith... so weird

Hi ,
I have a below use case .
Execute DB operations in async, after that is done send out a kafka event to another microservice so that it reads from DB. However as of now the kafka event is being sent even before the DB operation is complete. My code looks as below :

firstTask = Task(doSomeDBOperation).executeOn(io).asyncBoundary
secondTask = Task(doSomeDBOperation).executeOn(io).asyncBoundary
thirdTask = Task(doSomeDBUpdate).executeOn(io).asyncBoundary

Task.sequence(Seq(firstTask, secondTask, thirdTask, pushToKafkaTask))

Is there any way to ensure pushToKafkaTask surely happens after the first three task ?

Rohan Sircar
@Sourabh-s more like this
firstTask = Task(doSomeDBOperation).executeOn(io)
  secondTask = Task(doSomeDBOperation).executeOn(io)
  thirdTask = Task(doSomeDBUpdate).executeOn(io)
  for {
    _ <- Task.parSequence(List(firstTask, secondTask, thirdTask))
    _ <- pushToKafkaTask
  } yield ()
For parallel computations you use applicative methods, for sequential computations you use Monad methods (flatMap)


Task.parSequence(List(firstTask, secondTask, thirdTask)).doOnFinish(_ => pushToKafkaTask)

if you want try/finally semantics

Jasper Moeys
Task.sequence should be sequential / Monad though
Rohan Sircar
I thought so too. Maybe it's because of the asyncboundary calls?
@rohan-sircar yeah it seems to be because of the asyncboundary calls. Sadly even the solution given by you does not work .
I will be adding further code snippet below to show what firstTask and secondTask look like :
val firstTask = dbOperation1(k)

def dbOperation1(k: objPUT)(jt: JdbcTemplate, io: Scheduler): Task[Int] = {
    val params = Array(user.userId, DateUtils.currentTimestamp, k.getId)

    Task(jt.update(tDao.tUpdate, params: _*)).executeOn(io).asyncBoundary

val secondTask = dbOperation2(t.getId, t, existing)

def dbOperation2(id: String,input: objPUTGen, existing: objPUTGen = null,iStd: Boolean = true, 
                 isNSRefUpdate: Boolean = false,)(implicit user: UserDetails, jt: JdbcTemplate): Task[_] =

    Task.sequence(Seq(dbOperation3(id, input),
                      if (iStd)  dbOperation4( id, input) else Task.unit, dbOperation5(id, input, existing, isNSRefUpdate) ))

def dbOperation3(id: String, input: TemplateGeneric)(implicit user: UserDetails, jt: JdbcTemplate, io: Scheduler): Task[_] = {

    val sDel =
         | delete from "tableName"
         | where ID = ?

    Task(jt.update(sDel, id)).executeOn(io).asyncBoundary
Rohan Sircar
Well.. it should work. Code above looks fine, the problem could be in pushToKafka if it has un-suspended side effects. Otherwise idk ¯\(ツ)
I have question, I'm currently looking for a logged exception that seems to be reported to the sheduler. I supposed I could "catch" it if I do somthing as onErrorFallbackTo(Task.unit) just before any startAndForget. It's do not work so I'm wondering if the it's not reported what ever it's catch or not in UnfoldObservable.unsafeSubscribeFn.
Do someone have an advice that could help me to properly control logs with (supposly) async exception?
1 reply
Mateusz Zakarczemny
How do you guys manage java memory when running in container environment (eg k8s) while using monix io sheduler (Scheduler.io())
IO scheduler is unbounded so it may spawn undefined number of threads. Threads keep stacks out of java heap space (xmx) so we cannot limit them.
Therefore in case of huge number of IO operations using Scheduler.io may lead to OOM errors on the container.
Do you deal with that / manage that somehow?
Vish Ramachandran
This is a question on monix-kafka. I want to be able to create a consumer Observable by subscribing to a specific partition number, of a specific topic, starting at a specific offset. I manage checkpointing outside kafka. Is this possible ? This requires using Kafka's assign api and not the usual subscribe api on the KafkaConsumer object. Is this supported ? Could I get some pointers please ?
1 reply
If not supported, what is the recommended workaround ?
Anirudh Vyas
hi I am using monix scheduler to parallelize a long running workload, I need system to be up and running, for now I thought ok maybe I will have Thread.sleep(30 days) to do it, but I need a more elegant way to do this - any suggestions?
Eliav Lavi
Hello 👋 funny question - am I missing something or Task.materialize & Task.attempt are effectually the same thing?
Peter Nham
Is there an easy way to go from Observable[ByteBuffer] => Observable[String]. I've attempted to use monix-nio utf decoding, but it's not splitting correctly on the line ends
Alex Nedelcu
Anirudh Vyas

hi there, I am trying to assemble a fat jar with monix using sbt assembly and I am getting these errors:

[error] deduplicate: different file contents found in the following:
[error] /Users/anirudh.vyas/Library/Caches/Coursier/v1/https/repo1.maven.org/maven2/io/monix/monix-internal-jctools_3/3.4.0/monix-internal-jctools_3-3.4.0.jar:scala/runtime/function/JProcedure18.class
[error] /Users/anirudh.vyas/Library/Caches/Coursier/v1/https/repo1.maven.org/maven2/org/scala-lang/scala3-library_3/3.1.2/scala3-library_3-3.1.2.jar:scala/runtime/function/JProcedure18.class
[error] deduplicate: different file contents found in the following:

any suggestions for me?

my build sbt has :
  libraryDependencies += "io.monix" %% "monix" % "3.4.0",
this is my assembly setting:
lazy val assemblySettings = Seq(
  assembly / assemblyMergeStrategy := {
    case PathList("module-info.class") => MergeStrategy.discard
    case path if path.contains("META-INF/services") => MergeStrategy.concat
    case PathList("META-INF", xs@_*) => MergeStrategy.discard
    case "reference.conf" => MergeStrategy.concat
    case _ => MergeStrategy.first
  assembly / test := {} // skip unit test when build uber jar
Alex Nedelcu
@AnirudhVyas: this is a known issue, will release a new version with the fix soon.
Alex Nedelcu
@AnirudhVyas: try version the released 3.4.1
Pierre Marais

Good morning. Will a Local or ThreadLocal be propagated when converting Task -> Future -> Task? Looks to me like this isn't working. Any way I can ensure propagation here?

implicit val scheduler = Scheduler.traced
val local = Local(0)

for {
  _     <- Task.fromFuture(Task(local.update(1)).runToFuture)
  value <- Task(local.get)
} yield println(s"Local value in Future $value")

Await.result(f.runToFuture, Duration.Inf)

It prints Local value in Future 0

Even Local.isolate doesn't work here

Pierre Marais
Also tried explicitly with .runToFutureOpt(scheduler, Task.Options(true, true)
Alexandru Nedelcu

@Deeds67 I don't know why your particular sample doesn't work, but I can say that there are issues in this integration, as the propagation mechanisms are different.

Try running those tasks by setting an implicit Task.Options(localContextPropagation=true) in that context, and see if that works. Or switch to usage of a TaskLocal.

Pierre Marais
Thanks, TaskLocal works :+1: