Task
s that are not yet attached to execution contexts should be serializable out of the box, no?
Observable
that waits for input? I imagine if you're streaming an Observable
from a file and there's a delay on disk IO, it will patiently wait before getting the next bits and processing them. Is there a way to get the same behaviour waiting for input from my program?
Task
versions whenever possible, there are also for traverse
, parTraverse
and normal sequence
+ unordered variants
NoSuchMethodError
when running KafkaConsumerObservable.mapTask(???).consumerWith(???).runAsync
->java.lang.NoSuchMethodError: monix.eval.Task$Context.frameRef()Lmonix/eval/Task$FrameIndexRef;
at monix.kafka.KafkaConsumerObservable.$anonfun$feedTask$4(KafkaConsumerObservable.scala:90)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:140)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
.3.0.0-RC2-d0feeba
, monix-kafka 1.0.0-RC1
, and cats 1.0.0
. Does this mean that monix-kafka does not have a compatible version with monix 3.0.0-RC2 yet?
monix-kafka
is built on monix RC1
which is not binary compatible with monix RC2-hash
and cats-effect-1.0.0
monix-3.0.0-RC2
yet, just a hash build so I suspect monix-kafka
will release a new version when monix
is released
I have a quick question about Subject[I, O]
. All the examples and factory methods have I = O
. Also, the unimplemented methods are:
Subscriber[O] => Cancelable
(Observable[O])
and
I => Future[Ack]
(Observer[I])
There is no way to go from I
to O
, so it seems Subject
can never be used unless I = O
. So why is it parameterised by different types?
Consumer.loadBalance
it looks like this:val consumers =
List(("acc1", 500.millis), ("acc2", 300.millis))
.map {
case (account, delay) =>
Consumer.foreachTask((task: Long) =>
Task.sleep(delay).map(_ => println(s"[$account] processed $task")))
}
val finished =
Observable.range(0, 6)
.dump("source")
.consumeWith(Consumer.loadBalance(consumers: _*))
Await.result(finished.runAsync, 5.seconds)
MVar
.
Hi, I have this issue where the last element on a blocking Iterator is not passed through the pipeline. I'm looking at this piece of code in IteratorAsObservable
's fastLoop
val next = iter.next()
iteratorHasNext = iter.hasNext
streamErrors = false
ack = out.onNext(next)
looks like if the 2nd line is blocking (on an infinite Iterator for example) then the onNext
callback on the current element is never called? Did I miss something here? @alexandru ?
Many thanks!
ConcurrentSubject
:)