Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
@Avasil is it possible that Task.create somehow creates a new local context?
I'm investigating an issue where locals are not propagated when a task that's constructed with Task.create runs
Piotr Gawryś
@jvican It shouldn't create a new context unless there is a bug.
Running Task creates a new one if you're trying to share the context between concurrent "run-loops"
Does it happen if you use a different constructor like Task.eval?
@fizzy33 it's about better stack traces for Task, I've included few examples in the PR: monix/monix#1267
@Avasil I think I'm onto something, replacing Task.create by Task.fromCancelablePromise makes the context propagation work
Piotr Gawryś
Okay, I am going to look into it later today
Alec Zorab
@jvican They'd definitely be pretty useful for me - at the moment I'm busy relying on a very roughly hacked together edit of grpcmonix. It's not urgent though, so if there's a lot of extra work for you to get them into a releasable state, I'm not in a massive hurry
Glen Marchesani
hey I am working on an RPC framework using monix (well converting our existing RPC to be based on monix). Short description is the RPC is defined as

// api layer

case class DoSomethingRequest(
  foo: Int,
  numbersStream: Observable[Int],

case class DoSomethingResponse(
  bar: Int,
  summedNumbers: Observable[Int], 

trait MyApi[M[_, _]] extends ApiBuilder[M[_,_]] {
  val doSomething = remoteCall[DoSomethingRequest, DoSomethingResponse]

... // in some client else where

implicit val messagingFabric: MessagingFabric = ...  // lots of ways to do this 1 to 1 RPC or 1 to many RPC lets assume MessagingFabric is hard wired to a single server

object MyApiClient extends MyApi[RemoteCall]

// RemoteCall exposes the call as a Task[Response]

val request: DoSomethingRequest = ... 

val response: Task[DoSomethingResponse] = MyApiClient.doSomething(request)
noting that the summedNumbers in DoSomethingResponse is all managed and pass from server to client via the MessagingFabric
Would this be useful to other people ?
Glen Marchesani
to be clear this isn't a pet project, this is a heavily hardened RPC layer we use with at all our clients that can handle a diverse range of distributed topologies
Glen Marchesani
since it now relies on monix / observable I think it is generic enough and open source able we are just trying to decide that if we make the open source effort if anyone will actually use it
Glen Marchesani
I have a use case where I have a global val allMessages: Observable[Message]
then a lot of routing where a message has a correlation which routes it.
right now things that need a "subChannel" i.e. all message matching a particular correlation. I am using
val subChannel = allMessages.filter(_.id === subChannelId)
I am going to run into use cases where there are thousands if not 100K's of sub channel's filtering off the same allMessages.
Is there anything in Monix that can support a more robust distributor pattern ?
I have my hand coded one I am trying now but figured to ask here first ;-)
Glen Marchesani
interestingly. On the UI side we are heavy users of https://github.com/OlivierBlanvillain/monadic-html and ran into the same thing there...
Fabien Chaillou

I have an issue with monix-kafka.
I have the following code :

     .map(data => Element(parseData(data), data.offset)))
     .timeoutOnSlowUpstreamTo(10.seconds, Observable.now(End))
     .mapEval(offsetBatch: CommittableOffsetBatch => offsetBatch.commitASync())

this fails because the timeoutOnSlowUpstream probably cancel the upstream and that closes the kafka consumer and the commit fails with :
This consumer has already been closed

any idea to fix that ?

Fabien Chaillou
FYI for the time being, i'm recreating a new consumer specifically to commit the offsets :
def commitOffsets(committableOffsetBatch: CommittableOffsetBatch): Task[Unit] = 
      .createConsumer[String, Array[Byte]](kafkaConfig.toConsumerConfig, List.empty[String]))
      .use { consumer =>
        Task {
          val commitMetadata = committableOffsetBatch.offsets.map {
            case (k, v) => k -> new OffsetAndMetadata(v)
Piotr Gawryś

... + mapEval take 10 seconds?

I'll try to think about it but I think we need a new operator, something like UpstreamTimeoutObservable but instead of canceling subscription immediately after the timeout, it would wait for Ack from downstream and return Stop. It would be pretty neat for graceful shutdown

Fabien Chaillou
actually i need the end at that place in my code to detect that we are done (i'm writing data to disk and then uploading to snowflake and i'm using the end to detect that we have no more data and flush the remaining files)
@Avasil so i can take example from UpstreamTimeoutObservable to implement it ?
If so i will try to take some time to look at that !
Piotr Gawryś
I'm looking at timeoutOnSlowUpstreamTo and it ignores the time it takes to process downstream (e.g. timeoutOnSlowUpstream(1.second).delayOnNext(10.minutes) won't timeout right away) so your issue shouldn't happen regardless of how long ... and commit take.
Can you check if there is any Stop somewhere in ... before the commit finishes? Or if the problem persists if the commit is synchronous
Fabien Chaillou
I'm only using high level operators in my code so i'm not sure of what would add the Stop
basically my whole high level logic is this :
        .map(message => Next(message.record.value(), message.committableOffset))
        .timeoutOnSlowUpstreamTo(10.seconds, Observable.now(End))
        .mapEval(data => data.traverse(parseMessage))
        .flatScan(Option.empty[FileToPush] -> CommittableOffsetBatch.empty) { ... }
        .collect { case (Some(fileToPush), offsetBatch) => DataWithOffset(fileToPush, offsetBatch) }
        .mapEval(fileToPush => fileToPush.traverse(pushFile))
        .mapEval { dataToWrite =>
          loadData(dataToWrite.data) <*
Piotr Gawryś
Try adding doOnEarlyStop(Task(println("A"))) after timeoutOnSlowUpstreamTo. Early stop happens when downstream wants to stop upstream (source).
There shouldn't be any Stop unless one of the methods fail. Maybe any of parsemessage, pushFile, addNextFileToPush?
Fabien Chaillou
our stream completes properly now with creating a new consumer for the commit so nothing should fail there
will try the earlyStop
oh, maybe it did fail at the time though !
@Avasil the "A" is not being printed
Piotr Gawryś
Weird, I feel like timeoutOnSlowUpstreamTo shouldn't cause this :D
Fabien Chaillou
sorry i have to go, i will continue tomorrow morning (eastern time) to investigate, thanks for the help !
i'm using the latest release (3.2.2) btw
Piotr Gawryś


since it now relies on monix / observable I think it is generic enough and open source able we are just trying to decide that if we make the open source effort if anyone will actually use it

Hard to say if people will use it, or not, often it's up to the marketing :D If it's not a lot of effort for your team, I'd encourage you to do so regardless. There are very few non-trivial open-source projects that use Monix, or FP libraries in general. I see a massive value in mature projects like yours

Is there anything in Monix that can support a more robust distributor pattern ?

If each subChannel has its own subChannelId then you could call groupBy but I'm probably missing some context and it's late for me

Glen Marchesani
@Avasil fair points...
groupBy just may do it. let me give that a try
Fabien Chaillou

Hello @Avasil , to debug my issue from yesterday i added doOnEarlyStop and doOnComplete between each of my steps :

        .map(message => Next(message.record.value(), KafkaOffset(message.committableOffset)))
        .doOnComplete(Task(println("Complete before timeoutOnSlowUpstreamTo")))
        .doOnEarlyStop(Task(println("Early stop before timeoutOnSlowUpstreamTo")))
        .timeoutOnSlowUpstreamTo(10.seconds, Observable.now(End))
        .doOnComplete(Task(println("Complete before parse messages")))
        .doOnEarlyStop(Task(println("Early stop before parse messages")))
        .mapEval(data => data.traverse(parseMessage))
        .doOnComplete(Task(println("Complete before write files to disk")))
        .doOnEarlyStop(Task(println("Early stop before write files to disk")))
        .flatScan(Option.empty[FileToPush] -> CommittableOffsetBatch.empty) { ... }
        .doOnComplete(Task(println("Complete before collect")))
        .doOnEarlyStop(Task(println("Early stop before collect")))
        .collect { case (Some(fileToPush), offsetBatch) => DataWithOffset(fileToPush, offsetBatch) }
        .doOnComplete(Task(println("Complete before pushFile to snowflake")))
        .doOnEarlyStop(Task(println("Early stop before pushFile to snowflake")))
        .mapEval(fileToPush => fileToPush.traverse(snowflakeManager.pushFile(snowflakeStageName)))
        .doOnComplete(Task(println("Complete before foldLeft for global state")))
        .doOnEarlyStop(Task(println("Early stop before foldLeft for global state")))
        .doOnComplete(Task(println("Complete before load into snowflake")))
        .doOnEarlyStop(Task(println("Early stop before load into snowflake")))
        .mapEval { stagesToUpload =>
          Task(println("Committing offsets")) >> stagesToUpload.committableBatch.commitAsync()
        .doOnComplete(Task(println("Complete before headOrElseL")))
        .doOnEarlyStop(Task(println("Early stop before headOrElseL")))

looking at the result, it looks like the foldLeft completes the upstream early :

Complete before parse messages
Complete before write files to disk
Complete before collect
Complete before pushFile to snowflake
Complete before foldLeft for global state
Committing offsets
Complete before load into snowflake
Early stop before headOrElseL
Complete before headOrElseL
Early stop before load into snowflake

any suggestion ?
thanks :)

(I'm guessing the early stop comes from headOrElseL which makes sens but should impact nothing as the foldLeft will only output one element anyway
Glen Marchesani
is there a way to merge two observables and complete the merged observable if either of the two sources completes ?
Piotr Gawryś
@fizzy33 I don't remember if there is any existing operator but it's quite easy to write with Deferred:
      Observable.suspend {
        val signal = Deferred.unsafe[Task, Unit]
        val streamA = stream.takeUntilEval(signal.get).doOnComplete(signal.complete(()).onErrorHandle(_ => ()))
        val streamB = stream.takeUntilEval(signal.get).doOnComplete(signal.complete(()).onErrorHandle(_ => ()))

        Observable(streamA, streamB).merge
Glen Marchesani
awesome I will try that thanks (that is better than my hack)
Glen Marchesani
that worked swimmingly thanks @Avasil

Hello, everyone! I have a dilemma about observables concatenation.
Pipeline description:

  1. I have some source observable stub
  2. Subscribe to core observable and drop elements untilstub is completed
  3. Then find the item from the core by the predicate, map it and then concatenate with the remaining elements from thecore
  4. Concatenate all the stub elements with observable from the stem3
  5. Result should be 1500, 16, 17, 18, 19, 20
    Here's a working solution until the source is slow and emits every element in 50 millis.
val subject = ConcurrentSubject.publish[Int]
val core    = subject.asyncBoundary(OverflowStrategy.Default)
val stub    = Observable.empty[Int]

val resultObs = stub.publishSelector { history =>
  history ++ core
    .find(_ == 15)
    .map(_ * 100)
    .publishSelector { real =>
      real ++ core.dropUntil(real.completed)

  .traverse((10 to 30).toList) { i =>

val future = resultObs.toListL.runToFuture
println(Await.result(future, 1.minute))

If I remove .delayExecution (50.millis) then I won't be able to catch items after 15.
Is there any way to rewrite it to catch all the items with the fastest source?

Jamie Pullar
I was wondering if any can help with a failing scenario I have
We are using catnap to run a monix Event PubSub model.
One of our ConsumerF's handles sending out metrics over Http using an OkHttp async client
When we run on a lower CPU Count container (1 - 2 vcpus) and the Requests start timing out, quite often the consumer will silently Stop handling any more events and never recover.
With less CPU threads (0.5 vcpus), this could even stop all Consumers on the Event Publisher!
Is there a core stategy or configuration approach that I am missing here?
class ConcurrentPubSub(chan: ProducerF[IO, Unit, Event] with ChannelF[IO, Unit, Event])(implicit CS: ContextShift[IO]) {

  def subscribe[E <: Event](listener: EventListener[IO, E])(implicit CT: ClassTag[E]): IO[Fiber[IO, Unit]] = {
    val clazz = CT.runtimeClass

    for {
      registered <- Deferred[IO, Unit]
      task = chan.consume.use(c => registered.complete(()) >> process(clazz, c, listener, None))
      fiber <- task.start
      _ <- registered.get
    } yield fiber

  private def process[E <: Event](
                                   clazz: Class[_],
                                   consumer: ConsumerF[IO, Unit, Event],
                                   listener: EventListener[IO, E],
                                   index: Option[Int]
                                 ): IO[Unit] =
      .flatMap {
        case Right(e) if clazz.isAssignableFrom(e.getClass) && listener.handle.isDefinedAt(e.asInstanceOf[E]) =>

            .flatMap { _ =>
            }.handleErrorWith { ex =>
            // do stuff with the Exception
          } >> process(clazz, consumer, listener, index)
        case Left(_) =>
        //Channel halt received and logged
        case _ =>
          // This event is not relevant for this subscriber or is an ignored error
          process(clazz, consumer, listener, index)
Rohan Sircar
is calling .completedL.startAndForget on an observable unsafe/not recommended?
I noticed that the observable blocks the Task chain if I don't use parZip or startAndForget