Task
versions whenever possible, there are also for traverse
, parTraverse
and normal sequence
+ unordered variants
NoSuchMethodError
when running KafkaConsumerObservable.mapTask(???).consumerWith(???).runAsync
->java.lang.NoSuchMethodError: monix.eval.Task$Context.frameRef()Lmonix/eval/Task$FrameIndexRef;
at monix.kafka.KafkaConsumerObservable.$anonfun$feedTask$4(KafkaConsumerObservable.scala:90)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:140)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
.3.0.0-RC2-d0feeba
, monix-kafka 1.0.0-RC1
, and cats 1.0.0
. Does this mean that monix-kafka does not have a compatible version with monix 3.0.0-RC2 yet?
monix-kafka
is built on monix RC1
which is not binary compatible with monix RC2-hash
and cats-effect-1.0.0
monix-3.0.0-RC2
yet, just a hash build so I suspect monix-kafka
will release a new version when monix
is released
I have a quick question about Subject[I, O]
. All the examples and factory methods have I = O
. Also, the unimplemented methods are:
Subscriber[O] => Cancelable
(Observable[O])
and
I => Future[Ack]
(Observer[I])
There is no way to go from I
to O
, so it seems Subject
can never be used unless I = O
. So why is it parameterised by different types?
Consumer.loadBalance
it looks like this:val consumers =
List(("acc1", 500.millis), ("acc2", 300.millis))
.map {
case (account, delay) =>
Consumer.foreachTask((task: Long) =>
Task.sleep(delay).map(_ => println(s"[$account] processed $task")))
}
val finished =
Observable.range(0, 6)
.dump("source")
.consumeWith(Consumer.loadBalance(consumers: _*))
Await.result(finished.runAsync, 5.seconds)
MVar
.
Hi, I have this issue where the last element on a blocking Iterator is not passed through the pipeline. I'm looking at this piece of code in IteratorAsObservable
's fastLoop
val next = iter.next()
iteratorHasNext = iter.hasNext
streamErrors = false
ack = out.onNext(next)
looks like if the 2nd line is blocking (on an infinite Iterator for example) then the onNext
callback on the current element is never called? Did I miss something here? @alexandru ?
Many thanks!
ConcurrentSubject
:)
Hi, wondering if someone could clarify some best practice advice to me.
I have a queue: scala.collection.mutable.Queue[T]
that gets populated from some external source. I would like to create an Observable[T]
from this queue that dequeues elements from the queue if they are there.
I'm able to quite easily do this by:
def dequeue: Task[Option[T]] = Task { if (queue.nonEmpty) Option(queue.dequeue) else Option.empty }
val obs = Observable.repeatEvalF(dequeue).filter(_.isDefined).map(_.get)
Would this be the recommended way of creating the Observable? My fear is that it might be inefficient due to constant polling of the queue for values.