Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
James Phillips
@jdrphillips
So anything subscribed to the observable will do nothing until an element is fed to it
Oleg Pyzhcov
@oleg-py
That's the default behavior. You might want to look into ConcurrentSubject to feed elements in imperative fashion
James Phillips
@jdrphillips
Thank you. I've been reading the Observable API and it didn't seem to do what I wanted there. I'll read about concurrent subjects
Ryan Zeigler
@rzeigler
is there any difference between gather and parSequence or is one just the cats.Parallel name for the operatino?
Piotr Gawryś
@Avasil
@rzeigler gather is optimized version just for Task and not every Traverse
I'd recommend using Task versions whenever possible, there are also for traverse, parTraverse and normal sequence + unordered variants
Ryan Zeigler
@rzeigler
is there a Task.parTraverse or is that just map(f).gather?
Piotr Gawryś
@Avasil
wander / wanderUnordered
sherwinschiu
@sherwinschiu
Hi. I ran into the following NoSuchMethodError when running KafkaConsumerObservable.mapTask(???).consumerWith(???).runAsync ->
java.lang.NoSuchMethodError: monix.eval.Task$Context.frameRef()Lmonix/eval/Task$FrameIndexRef; at monix.kafka.KafkaConsumerObservable.$anonfun$feedTask$4(KafkaConsumerObservable.scala:90) at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:140) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157).
I'm using monix 3.0.0-RC2-d0feeba, monix-kafka 1.0.0-RC1, and cats 1.0.0. Does this mean that monix-kafka does not have a compatible version with monix 3.0.0-RC2 yet?
Gabriel Volpe
@gvolpe
@sherwinschiu do you have cats-effect in your classpath too?
check the evictions but that seems to be a binary compatibility issue
Definitely for what I can see here: https://github.com/monix/monix-kafka/releases/tag/v1.0.0-RC1 monix-kafka is built on monix RC1 which is not binary compatible with monix RC2-hash and cats-effect-1.0.0
There's no monix-3.0.0-RC2 yet, just a hash build so I suspect monix-kafka will release a new version when monix is released
Mateusz Górski
@goral09
@sherwinschiu when I see NoSuchMethodError first thing I do is checking the dependencies and bin comp.
James Phillips
@jdrphillips

I have a quick question about Subject[I, O]. All the examples and factory methods have I = O. Also, the unimplemented methods are:

Subscriber[O] => Cancelable (Observable[O])

and

I => Future[Ack] (Observer[I])

There is no way to go from I to O, so it seems Subject can never be used unless I = O. So why is it parameterised by different types?

Oleg Pyzhcov
@oleg-py
@jdrphillips this will be made possible if #690 is done :)
You can now have a subject of different I and O by abusing covariance, but there's no practical application that I know of
Vasily Kirichenko
@vasily-kirichenko
I need to process tasks in parallel, by a preconfigured number of (stateful) workers. Consumer.loadBalance looks like a perfect fit, but the workers work like a function, not like a final consumers, so they cannot be at the end of the stream. Ideas?
Vasily Kirichenko
@vasily-kirichenko
With Consumer.loadBalance it looks like this:
val consumers =
    List(("acc1", 500.millis), ("acc2", 300.millis))
        .map {
            case (account, delay) =>
                Consumer.foreachTask((task: Long) =>
                    Task.sleep(delay).map(_ => println(s"[$account] processed $task")))
        }

val finished =
    Observable.range(0, 6)
        .dump("source")
        .consumeWith(Consumer.loadBalance(consumers: _*))

Await.result(finished.runAsync, 5.seconds)
But consumers provide results, which should be processed further by downstream, which seems impossible with ^^ approach.
Vasily Kirichenko
@vasily-kirichenko
It works, but I'm uncomfortable with the fact that there are actually two streams connected implicitly by a MVar.
Phong Nguyen
@phongngtuan

Hi, I have this issue where the last element on a blocking Iterator is not passed through the pipeline. I'm looking at this piece of code in IteratorAsObservable 's fastLoop

      val next = iter.next()
      iteratorHasNext = iter.hasNext
      streamErrors = false
      ack = out.onNext(next)

looks like if the 2nd line is blocking (on an infinite Iterator for example) then the onNext callback on the current element is never called? Did I miss something here? @alexandru ?
Many thanks!

Phong Nguyen
@phongngtuan
Another dumb question: if i were to implement my own Observable to use a callback, is there any contract I should conform?
Oleg Pyzhcov
@oleg-py
@phongngtuan re latest question, it's easier to just use subjects, but generally you have a backpressure/Ack ordering - so you can't call onNext until the Future from previous onNext has resolved with Continue
Phong Nguyen
@phongngtuan
@oleg-py that means in the Observable implementation I should wait for the Future[Ack] to complete? Possibly with an entrant lock?
Oleg Pyzhcov
@oleg-py
Wait - yes, and for onNext only - onComplete can be signalled concurrently, for instance.
It's easier to just use ConcurrentSubject :)
Phong Nguyen
@phongngtuan
@oleg-py How will ConcurrentSubject integrate with a callback-based Java API? The api can give me either a Java stream or a callback. I tried Observable.fromIterator but that seems to not work well with infinite iterator so I implement my own Observable instead. Anw now that you mention Subject, it seems similar to a Pipe isn’t it? I am also using pipe as I have a function Observable[A] => Observable[B] that I use with pipeThrough with unicast. Monix 2 had a transform method that is now removed but I don’t know the alternative, should I be using pipeThrough?
Oleg Pyzhcov
@oleg-py
With ConcurrentSubject, whenever the API calls you back, you just call onNext with received value and that's it.
And you can flat/map/etc. it like a regular observable
It is a way to create observables by imperatively feeding events into it. Not really much in common with pipes
Phong Nguyen
@phongngtuan
@oleg-py re: concurrent object. Got it, so I should construct a Subject and consume from it. I’m reading the doc but it’s not obvious how to construct one
Phong Nguyen
@phongngtuan
Re: pipe and subject. Subject can subscribe to a Observable and emits downstream, that looks like a pipe to me
Oleg Pyzhcov
@oleg-py
I've never subscribed a subject to an observable :)
Piotr Gawryś
@Avasil
one day it will be merged and easier to find :)
James Phillips
@jdrphillips
@oleg-py Thanks for your reply above!
Oleg Pyzhcov
@oleg-py
yw
Phong Nguyen
@phongngtuan
thanks @oleg-py @Avasil :)
Michael Zuber
@mgzuber

Hi, wondering if someone could clarify some best practice advice to me.

I have a queue: scala.collection.mutable.Queue[T] that gets populated from some external source. I would like to create an Observable[T] from this queue that dequeues elements from the queue if they are there.

I'm able to quite easily do this by:

def dequeue: Task[Option[T]] = Task { if (queue.nonEmpty) Option(queue.dequeue) else Option.empty }
val obs = Observable.repeatEvalF(dequeue).filter(_.isDefined).map(_.get)

Would this be the recommended way of creating the Observable? My fear is that it might be inefficient due to constant polling of the queue for values.

Oleg Pyzhcov
@oleg-py
Yes, I'd expect this to poll constantly, potentially taking away whole thread
My preference would be to replace the Queue with a Subject
The second best is to dequeue everything, but execute a Task every once in a while
Michael Zuber
@mgzuber
Replacing Queue with Subject still has the problem of having to create an Observable
Oleg Pyzhcov
@oleg-py
Subject is an observable :)
Michael Zuber
@mgzuber
yeah exactly
Oleg Pyzhcov
@oleg-py
So if you can replace your enqueue with onNext, on, say ConcurrentSubject.publish, you should be all set