Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
Mateusz Zakarczemny
How do you guys manage java memory when running in container environment (eg k8s) while using monix io sheduler (Scheduler.io())
IO scheduler is unbounded so it may spawn undefined number of threads. Threads keep stacks out of java heap space (xmx) so we cannot limit them.
Therefore in case of huge number of IO operations using Scheduler.io may lead to OOM errors on the container.
Do you deal with that / manage that somehow?
Vish Ramachandran
This is a question on monix-kafka. I want to be able to create a consumer Observable by subscribing to a specific partition number, of a specific topic, starting at a specific offset. I manage checkpointing outside kafka. Is this possible ? This requires using Kafka's assign api and not the usual subscribe api on the KafkaConsumer object. Is this supported ? Could I get some pointers please ?
1 reply
If not supported, what is the recommended workaround ?
Anirudh Vyas
hi I am using monix scheduler to parallelize a long running workload, I need system to be up and running, for now I thought ok maybe I will have Thread.sleep(30 days) to do it, but I need a more elegant way to do this - any suggestions?
Eliav Lavi
Hello 👋 funny question - am I missing something or Task.materialize & Task.attempt are effectually the same thing?
Peter Nham
Is there an easy way to go from Observable[ByteBuffer] => Observable[String]. I've attempted to use monix-nio utf decoding, but it's not splitting correctly on the line ends
Alex Nedelcu
Anirudh Vyas

hi there, I am trying to assemble a fat jar with monix using sbt assembly and I am getting these errors:

[error] deduplicate: different file contents found in the following:
[error] /Users/anirudh.vyas/Library/Caches/Coursier/v1/https/repo1.maven.org/maven2/io/monix/monix-internal-jctools_3/3.4.0/monix-internal-jctools_3-3.4.0.jar:scala/runtime/function/JProcedure18.class
[error] /Users/anirudh.vyas/Library/Caches/Coursier/v1/https/repo1.maven.org/maven2/org/scala-lang/scala3-library_3/3.1.2/scala3-library_3-3.1.2.jar:scala/runtime/function/JProcedure18.class
[error] deduplicate: different file contents found in the following:

any suggestions for me?

my build sbt has :
  libraryDependencies += "io.monix" %% "monix" % "3.4.0",
this is my assembly setting:
lazy val assemblySettings = Seq(
  assembly / assemblyMergeStrategy := {
    case PathList("module-info.class") => MergeStrategy.discard
    case path if path.contains("META-INF/services") => MergeStrategy.concat
    case PathList("META-INF", xs@_*) => MergeStrategy.discard
    case "reference.conf" => MergeStrategy.concat
    case _ => MergeStrategy.first
  assembly / test := {} // skip unit test when build uber jar
Alex Nedelcu
@AnirudhVyas: this is a known issue, will release a new version with the fix soon.
Alex Nedelcu
@AnirudhVyas: try version the released 3.4.1
Pierre Marais

Good morning. Will a Local or ThreadLocal be propagated when converting Task -> Future -> Task? Looks to me like this isn't working. Any way I can ensure propagation here?

implicit val scheduler = Scheduler.traced
val local = Local(0)

for {
  _     <- Task.fromFuture(Task(local.update(1)).runToFuture)
  value <- Task(local.get)
} yield println(s"Local value in Future $value")

Await.result(f.runToFuture, Duration.Inf)

It prints Local value in Future 0

Even Local.isolate doesn't work here

Pierre Marais
Also tried explicitly with .runToFutureOpt(scheduler, Task.Options(true, true)
Alexandru Nedelcu

@Deeds67 I don't know why your particular sample doesn't work, but I can say that there are issues in this integration, as the propagation mechanisms are different.

Try running those tasks by setting an implicit Task.Options(localContextPropagation=true) in that context, and see if that works. Or switch to usage of a TaskLocal.

Pierre Marais
Thanks, TaskLocal works :+1:
Alexandru Nedelcu
Help needed — monix/monix#1560
Arunav Sanyal
Hi. I was looking through the monix source code. It seems that for all cases https://github.com/monix/monix/blob/v3.4.0/monix-catnap/shared/src/main/scala/monix/catnap/CircuitBreaker.scala#L415 the circuit breaker results in a RejectedExecutionException. I was wondering if it would be possible to instead complete with a different task when the circuit state is open. The reason for this is that I want to run the circuit breaker in a "dry run" mode to see if I have setup my params (maxFailures, resetTimeout) etc correctly
Alexandru Nedelcu
@Khalian not sure what you need there. You can always catch the RejectedExecutionException — e.g., .recoverWith { case _: RejectedExecutionException => myTask }. Does that not suffice?
Arunav Sanyal
Well the problem is a bit more nuanced than that. i am effectively trying to write in something called as "Observer Mode" in which we don't even Circuit Break (the operating problem here for me is that i do not know upfront how quickly I should be rejecting requests in production and I want to get some actual data). Not to mention that I need different messaging for different use cases. And I am conceptually think of that as a [Option[() => Task[R]]] and in case. The latter half of the problem is solved by your suggestion, expect there is now a pointless raise error and recoverWith which could have been avoided and the former does not have a solution today (hence I was thinking if I should write a pull request for this model)
The other problem I am trying to solve for is, if I have a badly set config and I want to update it in production based on data I see, what do I do. And I suppose I can solve that with keeping a couple of pieces of state on the timeout and maxFailures and if they are changed (say by AWS AppConfig as the backing dynamic store), then I can just go and reinitialize a new CircuitBreaker
Hi, someone could tell me, please, what this means?
FrmPedido.scala:276:60: Symbol 'type cats.NonEmptyParallel.Aux' is missing from the classpath.
[error] This symbol is required by 'value monix.reactive.Observable.observableNonEmptyParallel'.
[error] Make sure that type Aux is in your classpath and check for conflicting dependencies with `-Ylog-classpath`.
[error] A full rebuild may help if 'Observable.class' was compiled against an incompatible version of cats.NonEmptyParallel.
Vish Ramachandran
@alexandru I am trying to convert messages received in akka actor (receiving remote messages) to a monix Observable. The main intent is to process the numerous messages received in a pipelined way and reduce memory consumption. I see in earlier threads for this use case where the suggestion is to use ConcurrentSubject. But ConcurrentSubject buffers all messages in a Vector without removing them after consumption. In my case, I have a single producer and a single consumer, and there is no requirement for multi-cast to multiple consumers. Is there a simpler recipe here that is more memory efficient?
Vish Ramachandran
I looked into Observable.repeatEvalF with ConcurrentQueue approach, but it is not clear how to feed to the queue from the Receive method of actor. Any suggestions on this thread as well ?
1 reply
can Monix be easily integrated into React (slinky) scalajs apps?
or do you need to manually call render on the react app to propagate changes to the virtual dom?
Ivan Oreskovic

Hi there!
Is there a way do echoRepeated, but only a limited amount of times and/or with changing timeout?
Or a more generalized approach, echoRepeated, but with a "retryPolicy" of sorts.
I have a case where I should be emitting last know element from source, but in exponential delay fashion.
I quickly drafted the ugly way to do it:

  def backoff[A](maxRetries: Int, backoffStrategy: BackoffStrategy): Observable[A] => Observable[A] =
    source => {
      @tailrec def
      backoff0(source0: Observable[A], retries: Int = 0): Observable[A] = {
        val delay = backoffStrategy.computeDelayBeforeNextRetry(
        if (retries >= maxRetries)
          backoff0(source0.echoOnce(delay.millis), retries + 1)

But was wondering if there is something more elegant?

Hi there, I'm not quite sure when to use deferFuture and deferFutureAction. I tend to use deferFutureAction all the time but i'm not sure it's required, what I usually wrap is function call returning futures
Jason Xiong

Is there an easy way to disallow returning Task in Task#map? A somewhat common error that occurs for us is that people will return Task in Task#map, which leads to Task[Task[T]], but type erasure makes is so that this is indifferentiable from Task[Unit], leading to dangling tasks.

As an example, the following compiles but is bugged, since the inner doSomething(x) will never run.

def f(): Task[Unit] = {
  for {
    x <- getSomething()
  } yield {
    doSomething(x)  // doSomething returns a Task.

We currently ban Task[Unit] for this reason, but I'm wondering if there's a more standardized approach to handle this edge case?