Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Vasily Kirichenko
@vasily-kirichenko
I need to process tasks in parallel, by a preconfigured number of (stateful) workers. Consumer.loadBalance looks like a perfect fit, but the workers work like a function, not like a final consumers, so they cannot be at the end of the stream. Ideas?
Vasily Kirichenko
@vasily-kirichenko
With Consumer.loadBalance it looks like this:
val consumers =
    List(("acc1", 500.millis), ("acc2", 300.millis))
        .map {
            case (account, delay) =>
                Consumer.foreachTask((task: Long) =>
                    Task.sleep(delay).map(_ => println(s"[$account] processed $task")))
        }

val finished =
    Observable.range(0, 6)
        .dump("source")
        .consumeWith(Consumer.loadBalance(consumers: _*))

Await.result(finished.runAsync, 5.seconds)
But consumers provide results, which should be processed further by downstream, which seems impossible with ^^ approach.
Vasily Kirichenko
@vasily-kirichenko
It works, but I'm uncomfortable with the fact that there are actually two streams connected implicitly by a MVar.
Phong Nguyen
@phongngtuan

Hi, I have this issue where the last element on a blocking Iterator is not passed through the pipeline. I'm looking at this piece of code in IteratorAsObservable 's fastLoop

      val next = iter.next()
      iteratorHasNext = iter.hasNext
      streamErrors = false
      ack = out.onNext(next)

looks like if the 2nd line is blocking (on an infinite Iterator for example) then the onNext callback on the current element is never called? Did I miss something here? @alexandru ?
Many thanks!

Phong Nguyen
@phongngtuan
Another dumb question: if i were to implement my own Observable to use a callback, is there any contract I should conform?
Oleg Pyzhcov
@oleg-py
@phongngtuan re latest question, it's easier to just use subjects, but generally you have a backpressure/Ack ordering - so you can't call onNext until the Future from previous onNext has resolved with Continue
Phong Nguyen
@phongngtuan
@oleg-py that means in the Observable implementation I should wait for the Future[Ack] to complete? Possibly with an entrant lock?
Oleg Pyzhcov
@oleg-py
Wait - yes, and for onNext only - onComplete can be signalled concurrently, for instance.
It's easier to just use ConcurrentSubject :)
Phong Nguyen
@phongngtuan
@oleg-py How will ConcurrentSubject integrate with a callback-based Java API? The api can give me either a Java stream or a callback. I tried Observable.fromIterator but that seems to not work well with infinite iterator so I implement my own Observable instead. Anw now that you mention Subject, it seems similar to a Pipe isn’t it? I am also using pipe as I have a function Observable[A] => Observable[B] that I use with pipeThrough with unicast. Monix 2 had a transform method that is now removed but I don’t know the alternative, should I be using pipeThrough?
Oleg Pyzhcov
@oleg-py
With ConcurrentSubject, whenever the API calls you back, you just call onNext with received value and that's it.
And you can flat/map/etc. it like a regular observable
It is a way to create observables by imperatively feeding events into it. Not really much in common with pipes
Phong Nguyen
@phongngtuan
@oleg-py re: concurrent object. Got it, so I should construct a Subject and consume from it. I’m reading the doc but it’s not obvious how to construct one
Phong Nguyen
@phongngtuan
Re: pipe and subject. Subject can subscribe to a Observable and emits downstream, that looks like a pipe to me
Oleg Pyzhcov
@oleg-py
I've never subscribed a subject to an observable :)
Piotr Gawryś
@Avasil
one day it will be merged and easier to find :)
James Phillips
@jdrphillips
@oleg-py Thanks for your reply above!
Oleg Pyzhcov
@oleg-py
yw
Phong Nguyen
@phongngtuan
thanks @oleg-py @Avasil :)
Michael Zuber
@mgzuber

Hi, wondering if someone could clarify some best practice advice to me.

I have a queue: scala.collection.mutable.Queue[T] that gets populated from some external source. I would like to create an Observable[T] from this queue that dequeues elements from the queue if they are there.

I'm able to quite easily do this by:

def dequeue: Task[Option[T]] = Task { if (queue.nonEmpty) Option(queue.dequeue) else Option.empty }
val obs = Observable.repeatEvalF(dequeue).filter(_.isDefined).map(_.get)

Would this be the recommended way of creating the Observable? My fear is that it might be inefficient due to constant polling of the queue for values.

Oleg Pyzhcov
@oleg-py
Yes, I'd expect this to poll constantly, potentially taking away whole thread
My preference would be to replace the Queue with a Subject
The second best is to dequeue everything, but execute a Task every once in a while
Michael Zuber
@mgzuber
Replacing Queue with Subject still has the problem of having to create an Observable
Oleg Pyzhcov
@oleg-py
Subject is an observable :)
Michael Zuber
@mgzuber
yeah exactly
Oleg Pyzhcov
@oleg-py
So if you can replace your enqueue with onNext, on, say ConcurrentSubject.publish, you should be all set
Michael Zuber
@mgzuber
Yes!
Okay that's what I was looking for
Thanks for your help @oleg-py :)
Oleg Pyzhcov
@oleg-py
You're welcome :)
Yohann B
@ybr
Hi, I am looking for some way to have the same behaviour as Observable.bufferTimedWithPressure(period, maxSize), but not counting the elements viewed by the Observable. Instead counting on the internal structure of the elements, is there such operation on observable ? Something like Observable[A].bufferTimedWithPressure2(period, maxSize: Int, sizer: A => Int). The idea is each element contains lots of data and I am not sure that inlining them is an effective way to process things.
Yohann B
@ybr
hum just saw monix/monix#447
Alexandru Nedelcu
@alexandru

New PR: monix/monix#738

scuddalo
@scuddalo
@alexandru @Avasil monix noob question: I have a observable that fetches items from Amazon SQS, transforms the payload and calls some db function to persist to postgresql. it looks something like this: EventQueueListener SQSObservable.apply(queueUrl) .collect(deserialize) .consumeWith(msgHandler) .runAsync. How can I control the number of payloads that are processed at any point in time? When I run this the db CPU goes through the roof. Am I thinking about this wrong?
Piotr Gawryś
@Avasil
Something like max N payloads per second?
You could use 2nd Observable with one of the interval functions and zip it with your SQSObservable. This way you can limit it to processing 1 payload per specific duration. You could also take it in batches and then wait etc.
Should be quite simple to achieve any custom behavior you need
scuddalo
@scuddalo
thanks! Instead of writing a custom sqs observable... I was thinking of something like this:Observable .repeatEval(sqsClient.receiveMessage(queueUrl)) .flatMap(m => Observable.fromIterable(...)) .collect(deserialize) .mapParallelUnordered(1)(handlePayload)
@Avasil would mapParallelUnordered back pressure the source observable in this case?
Piotr Gawryś
@Avasil
I think the problem is that even processing elements one-by-one you are too fast
so in both cases you will process 1 element at a time
scuddalo
@scuddalo
oh..