Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info

Hi ,
I have a below use case .
Execute DB operations in async, after that is done send out a kafka event to another microservice so that it reads from DB. However as of now the kafka event is being sent even before the DB operation is complete. My code looks as below :

firstTask = Task(doSomeDBOperation).executeOn(io).asyncBoundary
secondTask = Task(doSomeDBOperation).executeOn(io).asyncBoundary
thirdTask = Task(doSomeDBUpdate).executeOn(io).asyncBoundary

Task.sequence(Seq(firstTask, secondTask, thirdTask, pushToKafkaTask))

Is there any way to ensure pushToKafkaTask surely happens after the first three task ?

Rohan Sircar
@Sourabh-s more like this
firstTask = Task(doSomeDBOperation).executeOn(io)
  secondTask = Task(doSomeDBOperation).executeOn(io)
  thirdTask = Task(doSomeDBUpdate).executeOn(io)
  for {
    _ <- Task.parSequence(List(firstTask, secondTask, thirdTask))
    _ <- pushToKafkaTask
  } yield ()
For parallel computations you use applicative methods, for sequential computations you use Monad methods (flatMap)


Task.parSequence(List(firstTask, secondTask, thirdTask)).doOnFinish(_ => pushToKafkaTask)

if you want try/finally semantics

Jasper Moeys
Task.sequence should be sequential / Monad though
Rohan Sircar
I thought so too. Maybe it's because of the asyncboundary calls?
@rohan-sircar yeah it seems to be because of the asyncboundary calls. Sadly even the solution given by you does not work .
I will be adding further code snippet below to show what firstTask and secondTask look like :
val firstTask = dbOperation1(k)

def dbOperation1(k: objPUT)(jt: JdbcTemplate, io: Scheduler): Task[Int] = {
    val params = Array(user.userId, DateUtils.currentTimestamp, k.getId)

    Task(jt.update(tDao.tUpdate, params: _*)).executeOn(io).asyncBoundary

val secondTask = dbOperation2(t.getId, t, existing)

def dbOperation2(id: String,input: objPUTGen, existing: objPUTGen = null,iStd: Boolean = true, 
                 isNSRefUpdate: Boolean = false,)(implicit user: UserDetails, jt: JdbcTemplate): Task[_] =

    Task.sequence(Seq(dbOperation3(id, input),
                      if (iStd)  dbOperation4( id, input) else Task.unit, dbOperation5(id, input, existing, isNSRefUpdate) ))

def dbOperation3(id: String, input: TemplateGeneric)(implicit user: UserDetails, jt: JdbcTemplate, io: Scheduler): Task[_] = {

    val sDel =
         | delete from "tableName"
         | where ID = ?

    Task(jt.update(sDel, id)).executeOn(io).asyncBoundary
Rohan Sircar
Well.. it should work. Code above looks fine, the problem could be in pushToKafka if it has un-suspended side effects. Otherwise idk ¯\(ツ)
I have question, I'm currently looking for a logged exception that seems to be reported to the sheduler. I supposed I could "catch" it if I do somthing as onErrorFallbackTo(Task.unit) just before any startAndForget. It's do not work so I'm wondering if the it's not reported what ever it's catch or not in UnfoldObservable.unsafeSubscribeFn.
Do someone have an advice that could help me to properly control logs with (supposly) async exception?
1 reply
Mateusz Zakarczemny
How do you guys manage java memory when running in container environment (eg k8s) while using monix io sheduler (Scheduler.io())
IO scheduler is unbounded so it may spawn undefined number of threads. Threads keep stacks out of java heap space (xmx) so we cannot limit them.
Therefore in case of huge number of IO operations using Scheduler.io may lead to OOM errors on the container.
Do you deal with that / manage that somehow?
Vish Ramachandran
This is a question on monix-kafka. I want to be able to create a consumer Observable by subscribing to a specific partition number, of a specific topic, starting at a specific offset. I manage checkpointing outside kafka. Is this possible ? This requires using Kafka's assign api and not the usual subscribe api on the KafkaConsumer object. Is this supported ? Could I get some pointers please ?
1 reply
If not supported, what is the recommended workaround ?
Anirudh Vyas
hi I am using monix scheduler to parallelize a long running workload, I need system to be up and running, for now I thought ok maybe I will have Thread.sleep(30 days) to do it, but I need a more elegant way to do this - any suggestions?
Eliav Lavi
Hello 👋 funny question - am I missing something or Task.materialize & Task.attempt are effectually the same thing?
Peter Nham
Is there an easy way to go from Observable[ByteBuffer] => Observable[String]. I've attempted to use monix-nio utf decoding, but it's not splitting correctly on the line ends
Alex Nedelcu
Anirudh Vyas

hi there, I am trying to assemble a fat jar with monix using sbt assembly and I am getting these errors:

[error] deduplicate: different file contents found in the following:
[error] /Users/anirudh.vyas/Library/Caches/Coursier/v1/https/repo1.maven.org/maven2/io/monix/monix-internal-jctools_3/3.4.0/monix-internal-jctools_3-3.4.0.jar:scala/runtime/function/JProcedure18.class
[error] /Users/anirudh.vyas/Library/Caches/Coursier/v1/https/repo1.maven.org/maven2/org/scala-lang/scala3-library_3/3.1.2/scala3-library_3-3.1.2.jar:scala/runtime/function/JProcedure18.class
[error] deduplicate: different file contents found in the following:

any suggestions for me?

my build sbt has :
  libraryDependencies += "io.monix" %% "monix" % "3.4.0",
this is my assembly setting:
lazy val assemblySettings = Seq(
  assembly / assemblyMergeStrategy := {
    case PathList("module-info.class") => MergeStrategy.discard
    case path if path.contains("META-INF/services") => MergeStrategy.concat
    case PathList("META-INF", xs@_*) => MergeStrategy.discard
    case "reference.conf" => MergeStrategy.concat
    case _ => MergeStrategy.first
  assembly / test := {} // skip unit test when build uber jar
Alex Nedelcu
@AnirudhVyas: this is a known issue, will release a new version with the fix soon.
Alex Nedelcu
@AnirudhVyas: try version the released 3.4.1
Pierre Marais

Good morning. Will a Local or ThreadLocal be propagated when converting Task -> Future -> Task? Looks to me like this isn't working. Any way I can ensure propagation here?

implicit val scheduler = Scheduler.traced
val local = Local(0)

for {
  _     <- Task.fromFuture(Task(local.update(1)).runToFuture)
  value <- Task(local.get)
} yield println(s"Local value in Future $value")

Await.result(f.runToFuture, Duration.Inf)

It prints Local value in Future 0

Even Local.isolate doesn't work here

Pierre Marais
Also tried explicitly with .runToFutureOpt(scheduler, Task.Options(true, true)
Alexandru Nedelcu

@Deeds67 I don't know why your particular sample doesn't work, but I can say that there are issues in this integration, as the propagation mechanisms are different.

Try running those tasks by setting an implicit Task.Options(localContextPropagation=true) in that context, and see if that works. Or switch to usage of a TaskLocal.

Pierre Marais
Thanks, TaskLocal works :+1:
Alexandru Nedelcu
Help needed — monix/monix#1560
Arunav Sanyal
Hi. I was looking through the monix source code. It seems that for all cases https://github.com/monix/monix/blob/v3.4.0/monix-catnap/shared/src/main/scala/monix/catnap/CircuitBreaker.scala#L415 the circuit breaker results in a RejectedExecutionException. I was wondering if it would be possible to instead complete with a different task when the circuit state is open. The reason for this is that I want to run the circuit breaker in a "dry run" mode to see if I have setup my params (maxFailures, resetTimeout) etc correctly
Alexandru Nedelcu
@Khalian not sure what you need there. You can always catch the RejectedExecutionException — e.g., .recoverWith { case _: RejectedExecutionException => myTask }. Does that not suffice?