monix-kafka
is built on monix RC1
which is not binary compatible with monix RC2-hash
and cats-effect-1.0.0
monix-3.0.0-RC2
yet, just a hash build so I suspect monix-kafka
will release a new version when monix
is released
I have a quick question about Subject[I, O]
. All the examples and factory methods have I = O
. Also, the unimplemented methods are:
Subscriber[O] => Cancelable
(Observable[O])
and
I => Future[Ack]
(Observer[I])
There is no way to go from I
to O
, so it seems Subject
can never be used unless I = O
. So why is it parameterised by different types?
Consumer.loadBalance
it looks like this:val consumers =
List(("acc1", 500.millis), ("acc2", 300.millis))
.map {
case (account, delay) =>
Consumer.foreachTask((task: Long) =>
Task.sleep(delay).map(_ => println(s"[$account] processed $task")))
}
val finished =
Observable.range(0, 6)
.dump("source")
.consumeWith(Consumer.loadBalance(consumers: _*))
Await.result(finished.runAsync, 5.seconds)
MVar
.
Hi, I have this issue where the last element on a blocking Iterator is not passed through the pipeline. I'm looking at this piece of code in IteratorAsObservable
's fastLoop
val next = iter.next()
iteratorHasNext = iter.hasNext
streamErrors = false
ack = out.onNext(next)
looks like if the 2nd line is blocking (on an infinite Iterator for example) then the onNext
callback on the current element is never called? Did I miss something here? @alexandru ?
Many thanks!
ConcurrentSubject
:)
Hi, wondering if someone could clarify some best practice advice to me.
I have a queue: scala.collection.mutable.Queue[T]
that gets populated from some external source. I would like to create an Observable[T]
from this queue that dequeues elements from the queue if they are there.
I'm able to quite easily do this by:
def dequeue: Task[Option[T]] = Task { if (queue.nonEmpty) Option(queue.dequeue) else Option.empty }
val obs = Observable.repeatEvalF(dequeue).filter(_.isDefined).map(_.get)
Would this be the recommended way of creating the Observable? My fear is that it might be inefficient due to constant polling of the queue for values.
Observable.bufferTimedWithPressure(period, maxSize)
, but not counting the elements viewed by the Observable. Instead counting on the internal structure of the elements, is there such operation on observable ? Something like Observable[A].bufferTimedWithPressure2(period, maxSize: Int, sizer: A => Int). The idea is each element contains lots of data and I am not sure that inlining them is an effective way to process things.
EventQueueListener
SQSObservable.apply(queueUrl)
.collect(deserialize)
.consumeWith(msgHandler)
.runAsync
. How can I control the number of payloads that are processed at any point in time? When I run this the db CPU goes through the roof. Am I thinking about this wrong?
Observable
with one of the interval
functions and zip
it with your SQSObservable
. This way you can limit it to processing 1 payload per specific duration. You could also take it in batches and then wait etc.