Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Glen Marchesani
@fizzy33
I have a use case where I have a global val allMessages: Observable[Message]
then a lot of routing where a message has a correlation which routes it.
right now things that need a "subChannel" i.e. all message matching a particular correlation. I am using
val subChannel = allMessages.filter(_.id === subChannelId)
I am going to run into use cases where there are thousands if not 100K's of sub channel's filtering off the same allMessages.
Is there anything in Monix that can support a more robust distributor pattern ?
I have my hand coded one I am trying now but figured to ask here first ;-)
Glen Marchesani
@fizzy33
interestingly. On the UI side we are heavy users of https://github.com/OlivierBlanvillain/monadic-html and ran into the same thing there...
Fabien Chaillou
@fchaillou

Hello,
I have an issue with monix-kafka.
I have the following code :

  kafkaObservable
     .map(data => Element(parseData(data), data.offset)))
     .timeoutOnSlowUpstreamTo(10.seconds, Observable.now(End))
     ...
     .mapEval(offsetBatch: CommittableOffsetBatch => offsetBatch.commitASync())

this fails because the timeoutOnSlowUpstream probably cancel the upstream and that closes the kafka consumer and the commit fails with :
This consumer has already been closed

any idea to fix that ?

Fabien Chaillou
@fchaillou
FYI for the time being, i'm recreating a new consumer specifically to commit the offsets :
def commitOffsets(committableOffsetBatch: CommittableOffsetBatch): Task[Unit] = 
    Resource.fromAutoCloseable(KafkaConsumerObservable
      .createConsumer[String, Array[Byte]](kafkaConfig.toConsumerConfig, List.empty[String]))
      .use { consumer =>
        Task {
          val commitMetadata = committableOffsetBatch.offsets.map {
            case (k, v) => k -> new OffsetAndMetadata(v)
          }.asJava
          blocking(consumer.synchronized(consumer.commitSync(commitMetadata)))
        }
      }
Piotr Gawryś
@Avasil

... + mapEval take 10 seconds?

I'll try to think about it but I think we need a new operator, something like UpstreamTimeoutObservable but instead of canceling subscription immediately after the timeout, it would wait for Ack from downstream and return Stop. It would be pretty neat for graceful shutdown

Fabien Chaillou
@fchaillou
actually i need the end at that place in my code to detect that we are done (i'm writing data to disk and then uploading to snowflake and i'm using the end to detect that we have no more data and flush the remaining files)
@Avasil so i can take example from UpstreamTimeoutObservable to implement it ?
If so i will try to take some time to look at that !
Piotr Gawryś
@Avasil
I'm looking at timeoutOnSlowUpstreamTo and it ignores the time it takes to process downstream (e.g. timeoutOnSlowUpstream(1.second).delayOnNext(10.minutes) won't timeout right away) so your issue shouldn't happen regardless of how long ... and commit take.
Can you check if there is any Stop somewhere in ... before the commit finishes? Or if the problem persists if the commit is synchronous
Fabien Chaillou
@fchaillou
I'm only using high level operators in my code so i'm not sure of what would add the Stop
basically my whole high level logic is this :
events.streamEvents
        .map(message => Next(message.record.value(), message.committableOffset))
        .timeoutOnSlowUpstreamTo(10.seconds, Observable.now(End))
        .mapEval(data => data.traverse(parseMessage))
        .flatScan(Option.empty[FileToPush] -> CommittableOffsetBatch.empty) { ... }
        .collect { case (Some(fileToPush), offsetBatch) => DataWithOffset(fileToPush, offsetBatch) }
        .mapEval(fileToPush => fileToPush.traverse(pushFile))
        .foldLeft(SnowflakeUploadState())(_.addNextFileToPush(_))
        .mapEval { dataToWrite =>
          loadData(dataToWrite.data) <*
            dataToWrite.committableBatch.commitAsync
        }
        .headOrElseL(Map.empty)
Piotr Gawryś
@Avasil
Try adding doOnEarlyStop(Task(println("A"))) after timeoutOnSlowUpstreamTo. Early stop happens when downstream wants to stop upstream (source).
There shouldn't be any Stop unless one of the methods fail. Maybe any of parsemessage, pushFile, addNextFileToPush?
Fabien Chaillou
@fchaillou
our stream completes properly now with creating a new consumer for the commit so nothing should fail there
will try the earlyStop
oh, maybe it did fail at the time though !
@Avasil the "A" is not being printed
Piotr Gawryś
@Avasil
Weird, I feel like timeoutOnSlowUpstreamTo shouldn't cause this :D
Fabien Chaillou
@fchaillou
sorry i have to go, i will continue tomorrow morning (eastern time) to investigate, thanks for the help !
i'm using the latest release (3.2.2) btw
Piotr Gawryś
@Avasil

@fizzy33

since it now relies on monix / observable I think it is generic enough and open source able we are just trying to decide that if we make the open source effort if anyone will actually use it

Hard to say if people will use it, or not, often it's up to the marketing :D If it's not a lot of effort for your team, I'd encourage you to do so regardless. There are very few non-trivial open-source projects that use Monix, or FP libraries in general. I see a massive value in mature projects like yours

Is there anything in Monix that can support a more robust distributor pattern ?

If each subChannel has its own subChannelId then you could call groupBy but I'm probably missing some context and it's late for me

Glen Marchesani
@fizzy33
@Avasil fair points...
groupBy just may do it. let me give that a try
Fabien Chaillou
@fchaillou

Hello @Avasil , to debug my issue from yesterday i added doOnEarlyStop and doOnComplete between each of my steps :

events.streamEvents
        .map(message => Next(message.record.value(), KafkaOffset(message.committableOffset)))
        .doOnComplete(Task(println("Complete before timeoutOnSlowUpstreamTo")))
        .doOnEarlyStop(Task(println("Early stop before timeoutOnSlowUpstreamTo")))
        .timeoutOnSlowUpstreamTo(10.seconds, Observable.now(End))
        .doOnComplete(Task(println("Complete before parse messages")))
        .doOnEarlyStop(Task(println("Early stop before parse messages")))
        .mapEval(data => data.traverse(parseMessage))
        .doOnComplete(Task(println("Complete before write files to disk")))
        .doOnEarlyStop(Task(println("Early stop before write files to disk")))
        .flatScan(Option.empty[FileToPush] -> CommittableOffsetBatch.empty) { ... }
        .doOnComplete(Task(println("Complete before collect")))
        .doOnEarlyStop(Task(println("Early stop before collect")))
        .collect { case (Some(fileToPush), offsetBatch) => DataWithOffset(fileToPush, offsetBatch) }
        .doOnComplete(Task(println("Complete before pushFile to snowflake")))
        .doOnEarlyStop(Task(println("Early stop before pushFile to snowflake")))
        .mapEval(fileToPush => fileToPush.traverse(snowflakeManager.pushFile(snowflakeStageName)))
        .doOnComplete(Task(println("Complete before foldLeft for global state")))
        .doOnEarlyStop(Task(println("Early stop before foldLeft for global state")))
        .foldLeft(SnowflakeUploadState())(_.addNextFileToPush(_))
        .doOnComplete(Task(println("Complete before load into snowflake")))
        .doOnEarlyStop(Task(println("Early stop before load into snowflake")))
        .mapEval { stagesToUpload =>
          Task(println("Committing offsets")) >> stagesToUpload.committableBatch.commitAsync()
        }
        .doOnComplete(Task(println("Complete before headOrElseL")))
        .doOnEarlyStop(Task(println("Early stop before headOrElseL")))
        .headOrElseL(Map.empty)

looking at the result, it looks like the foldLeft completes the upstream early :

Complete before parse messages
Complete before write files to disk
Complete before collect
Complete before pushFile to snowflake
Complete before foldLeft for global state
Committing offsets
Complete before load into snowflake
Early stop before headOrElseL
Complete before headOrElseL
Early stop before load into snowflake

any suggestion ?
thanks :)

(I'm guessing the early stop comes from headOrElseL which makes sens but should impact nothing as the foldLeft will only output one element anyway
Glen Marchesani
@fizzy33
is there a way to merge two observables and complete the merged observable if either of the two sources completes ?
Piotr Gawryś
@Avasil
@fizzy33 I don't remember if there is any existing operator but it's quite easy to write with Deferred:
      Observable.suspend {
        val signal = Deferred.unsafe[Task, Unit]
        val streamA = stream.takeUntilEval(signal.get).doOnComplete(signal.complete(()).onErrorHandle(_ => ()))
        val streamB = stream.takeUntilEval(signal.get).doOnComplete(signal.complete(()).onErrorHandle(_ => ()))

        Observable(streamA, streamB).merge
      }
Glen Marchesani
@fizzy33
awesome I will try that thanks (that is better than my hack)
Glen Marchesani
@fizzy33
that worked swimmingly thanks @Avasil
Ilya
@squadgazzz

Hello, everyone! I have a dilemma about observables concatenation.
Pipeline description:

  1. I have some source observable stub
  2. Subscribe to core observable and drop elements untilstub is completed
  3. Then find the item from the core by the predicate, map it and then concatenate with the remaining elements from thecore
  4. Concatenate all the stub elements with observable from the stem3
  5. Result should be 1500, 16, 17, 18, 19, 20
    Here's a working solution until the source is slow and emits every element in 50 millis.
val subject = ConcurrentSubject.publish[Int]
val core    = subject.asyncBoundary(OverflowStrategy.Default)
val stub    = Observable.empty[Int]

val resultObs = stub.publishSelector { history =>
  history ++ core
    .dropUntil(history.completed)
    .find(_ == 15)
    .map(_ * 100)
    .publishSelector { real =>
      real ++ core.dropUntil(real.completed)
    }
}

Task
  .traverse((10 to 30).toList) { i =>
    Task
      .eval(subject.onNext(i))
      .delayExecution(50.millis)
  }
  .executeAsync
  .runToFuture

val future = resultObs.toListL.runToFuture
Thread.sleep(5.seconds.toMillis)
subject.onComplete()
println(Await.result(future, 1.minute))

If I remove .delayExecution (50.millis) then I won't be able to catch items after 15.
Is there any way to rewrite it to catch all the items with the fastest source?

Jamie Pullar
@JamiePullar
I was wondering if any can help with a failing scenario I have
We are using catnap to run a monix Event PubSub model.
One of our ConsumerF's handles sending out metrics over Http using an OkHttp async client
When we run on a lower CPU Count container (1 - 2 vcpus) and the Requests start timing out, quite often the consumer will silently Stop handling any more events and never recover.
With less CPU threads (0.5 vcpus), this could even stop all Consumers on the Event Publisher!
Is there a core stategy or configuration approach that I am missing here?
class ConcurrentPubSub(chan: ProducerF[IO, Unit, Event] with ChannelF[IO, Unit, Event])(implicit CS: ContextShift[IO]) {

  def subscribe[E <: Event](listener: EventListener[IO, E])(implicit CT: ClassTag[E]): IO[Fiber[IO, Unit]] = {
    val clazz = CT.runtimeClass

    for {
      registered <- Deferred[IO, Unit]
      task = chan.consume.use(c => registered.complete(()) >> process(clazz, c, listener, None))
      fiber <- task.start
      _ <- registered.get
    } yield fiber
  }

  private def process[E <: Event](
                                   clazz: Class[_],
                                   consumer: ConsumerF[IO, Unit, Event],
                                   listener: EventListener[IO, E],
                                   index: Option[Int]
                                 ): IO[Unit] =
    consumer.pull
      .flatMap {
        case Right(e) if clazz.isAssignableFrom(e.getClass) && listener.handle.isDefinedAt(e.asInstanceOf[E]) =>

          IO.unit
            .flatMap { _ =>
              listener.handle(e.asInstanceOf[E])
            }.handleErrorWith { ex =>
            // do stuff with the Exception
            ()
          } >> process(clazz, consumer, listener, index)
        case Left(_) =>
        //Channel halt received and logged
        case _ =>
          // This event is not relevant for this subscriber or is an ignored error
          process(clazz, consumer, listener, index)
      }
}
Rohan Sircar
@rohan-sircar
is calling .completedL.startAndForget on an observable unsafe/not recommended?
I noticed that the observable blocks the Task chain if I don't use parZip or startAndForget
Alexandru Nedelcu
@alexandru

@rohan-sircar

I noticed that the observable blocks the Task chain if I don't use parZip or startAndForget

Not sure what you mean. Do you have an example?

Rohan Sircar
@rohan-sircar
@alexandru I created a minimal example - https://pastebin.com/RQHftUpe . Maybe I'm doing something wrong. I've used Observable.create here, but the same thing happens with an observable from a ConcurrentSubject.
Piotr Gawryś
@Avasil

@rohan-sircar That's expected and using startAndForget is fine.

The difference between parZip and startAndForget is if any of concurrent tasks fails.

If parZip is used, failure will cancel the other and the Task will return with an error.
In case of startAndForget, there is no connection so it will fail in the background without any effect on the other Task.

And sorry guys, I will look at those other questions later today, I was busy with releases and didn't want to distract myself with gitter
Rohan Sircar
@rohan-sircar

@Avasil Thanks for explaining. Just curious, but could you explain why this happens? Especially because this does not happen with Observable.apply.

I was thinking that this could be error prone. If there's an observable a few levels inside a nested Task chain, and you forget to start it, it would block that Task sub-chain and there would be no way of knowing it from the outer levels because the type does not reflect it.

Piotr Gawryś
@Avasil

@rohan-sircar I've answered without looking at your example. There is a potential mistake in Observable.create example because you never call onComplete so the Observable never ends.

I think I've misunderstood you - completedL doesn't block the Task chain forever. It blocks until the stream completes. So yeah, if the stream never ends then it will "block" but it's the same with Task.

In general, it's desirable because it lets you back-pressure on the result of the Observable and in other cases of operators returning Task (toListL, sumL etc.) there's no other way to the signature. If someone doesn't desire to wait for the completion then it's easy to opt-out by making Task run concurrently with others

Rohan Sircar
@rohan-sircar
Oh, right. The Observable.create example in the docs doesn't call onComplete either and that's what I was following. And my use case does deal with handling "infinite streams" of events for the life of the application so I cannot call onComplete in those. I think I'll let it be, atleast I now know what's going on. Thanks.
Piotr Gawryś
@Avasil

@JamiePullar

When we run on a lower CPU Count container (1 - 2 vcpus) and the Requests start timing out, quite often the consumer will silently Stop handling any more events and never recover.

Are you able to debug what's going on in process when it happens? Is it stuck on consumer.pull? Is fiber completed? Is there an error in listener.handle? Cancelation?

Jamie Pullar
@JamiePullar
@Avasil We have not been able to replicate locally. Im getting a developer to build a localised test bench to replicate. In the mean time have made a bunch of small changes for our Prod environment - switching out blaze client for async (missed this in our conf fle), use of cats Blocker for a smtp transport call. If we can replicate and get stuck, Ill get back to you. Thanks for the response.
Matthew de Detrich
@mdedetrich
@Avasil Wanted to thank you for pushing Monix 3.3.0 out. I apologize for being out of the loop, have a lot of personal stuff going on but I greatly appreciate the effort
We will start integrating these changes into our services which shouldn't be too hard since I am using Monix 3.2.x
Piotr Gawryś
@Avasil
@mdedetrich No worries, enjoy the features :D and good luck with personal stuff