Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
SemanticBeeng
@SemanticBeeng
Yes, spark does what it does. But it is quite useful to combine with monix, intuition suggests, because we can mix more complex functional business logic and go beyond the spark way of thinking.
Piotr Gawryś
@Avasil
@sherwinschiu polling is done internally
James Phillips
@jdrphillips
Is there a way to create an Observable that waits for input? I imagine if you're streaming an Observable from a file and there's a delay on disk IO, it will patiently wait before getting the next bits and processing them. Is there a way to get the same behaviour waiting for input from my program?
So anything subscribed to the observable will do nothing until an element is fed to it
Oleg Pyzhcov
@oleg-py
That's the default behavior. You might want to look into ConcurrentSubject to feed elements in imperative fashion
James Phillips
@jdrphillips
Thank you. I've been reading the Observable API and it didn't seem to do what I wanted there. I'll read about concurrent subjects
Ryan Zeigler
@rzeigler
is there any difference between gather and parSequence or is one just the cats.Parallel name for the operatino?
Piotr Gawryś
@Avasil
@rzeigler gather is optimized version just for Task and not every Traverse
I'd recommend using Task versions whenever possible, there are also for traverse, parTraverse and normal sequence + unordered variants
Ryan Zeigler
@rzeigler
is there a Task.parTraverse or is that just map(f).gather?
Piotr Gawryś
@Avasil
wander / wanderUnordered
sherwinschiu
@sherwinschiu
Hi. I ran into the following NoSuchMethodError when running KafkaConsumerObservable.mapTask(???).consumerWith(???).runAsync ->
java.lang.NoSuchMethodError: monix.eval.Task$Context.frameRef()Lmonix/eval/Task$FrameIndexRef; at monix.kafka.KafkaConsumerObservable.$anonfun$feedTask$4(KafkaConsumerObservable.scala:90) at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:140) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157).
I'm using monix 3.0.0-RC2-d0feeba, monix-kafka 1.0.0-RC1, and cats 1.0.0. Does this mean that monix-kafka does not have a compatible version with monix 3.0.0-RC2 yet?
Gabriel Volpe
@gvolpe
@sherwinschiu do you have cats-effect in your classpath too?
check the evictions but that seems to be a binary compatibility issue
Definitely for what I can see here: https://github.com/monix/monix-kafka/releases/tag/v1.0.0-RC1 monix-kafka is built on monix RC1 which is not binary compatible with monix RC2-hash and cats-effect-1.0.0
There's no monix-3.0.0-RC2 yet, just a hash build so I suspect monix-kafka will release a new version when monix is released
Mateusz Górski
@goral09
@sherwinschiu when I see NoSuchMethodError first thing I do is checking the dependencies and bin comp.
James Phillips
@jdrphillips

I have a quick question about Subject[I, O]. All the examples and factory methods have I = O. Also, the unimplemented methods are:

Subscriber[O] => Cancelable (Observable[O])

and

I => Future[Ack] (Observer[I])

There is no way to go from I to O, so it seems Subject can never be used unless I = O. So why is it parameterised by different types?

Oleg Pyzhcov
@oleg-py
@jdrphillips this will be made possible if #690 is done :)
You can now have a subject of different I and O by abusing covariance, but there's no practical application that I know of
Vasily Kirichenko
@vasily-kirichenko
I need to process tasks in parallel, by a preconfigured number of (stateful) workers. Consumer.loadBalance looks like a perfect fit, but the workers work like a function, not like a final consumers, so they cannot be at the end of the stream. Ideas?
Vasily Kirichenko
@vasily-kirichenko
With Consumer.loadBalance it looks like this:
val consumers =
    List(("acc1", 500.millis), ("acc2", 300.millis))
        .map {
            case (account, delay) =>
                Consumer.foreachTask((task: Long) =>
                    Task.sleep(delay).map(_ => println(s"[$account] processed $task")))
        }

val finished =
    Observable.range(0, 6)
        .dump("source")
        .consumeWith(Consumer.loadBalance(consumers: _*))

Await.result(finished.runAsync, 5.seconds)
But consumers provide results, which should be processed further by downstream, which seems impossible with ^^ approach.
Vasily Kirichenko
@vasily-kirichenko
It works, but I'm uncomfortable with the fact that there are actually two streams connected implicitly by a MVar.
Phong Nguyen
@phongngtuan

Hi, I have this issue where the last element on a blocking Iterator is not passed through the pipeline. I'm looking at this piece of code in IteratorAsObservable 's fastLoop

      val next = iter.next()
      iteratorHasNext = iter.hasNext
      streamErrors = false
      ack = out.onNext(next)

looks like if the 2nd line is blocking (on an infinite Iterator for example) then the onNext callback on the current element is never called? Did I miss something here? @alexandru ?
Many thanks!

Phong Nguyen
@phongngtuan
Another dumb question: if i were to implement my own Observable to use a callback, is there any contract I should conform?
Oleg Pyzhcov
@oleg-py
@phongngtuan re latest question, it's easier to just use subjects, but generally you have a backpressure/Ack ordering - so you can't call onNext until the Future from previous onNext has resolved with Continue
Phong Nguyen
@phongngtuan
@oleg-py that means in the Observable implementation I should wait for the Future[Ack] to complete? Possibly with an entrant lock?
Oleg Pyzhcov
@oleg-py
Wait - yes, and for onNext only - onComplete can be signalled concurrently, for instance.
It's easier to just use ConcurrentSubject :)
Phong Nguyen
@phongngtuan
@oleg-py How will ConcurrentSubject integrate with a callback-based Java API? The api can give me either a Java stream or a callback. I tried Observable.fromIterator but that seems to not work well with infinite iterator so I implement my own Observable instead. Anw now that you mention Subject, it seems similar to a Pipe isn’t it? I am also using pipe as I have a function Observable[A] => Observable[B] that I use with pipeThrough with unicast. Monix 2 had a transform method that is now removed but I don’t know the alternative, should I be using pipeThrough?
Oleg Pyzhcov
@oleg-py
With ConcurrentSubject, whenever the API calls you back, you just call onNext with received value and that's it.
And you can flat/map/etc. it like a regular observable
It is a way to create observables by imperatively feeding events into it. Not really much in common with pipes
Phong Nguyen
@phongngtuan
@oleg-py re: concurrent object. Got it, so I should construct a Subject and consume from it. I’m reading the doc but it’s not obvious how to construct one
Phong Nguyen
@phongngtuan
Re: pipe and subject. Subject can subscribe to a Observable and emits downstream, that looks like a pipe to me
Oleg Pyzhcov
@oleg-py
I've never subscribed a subject to an observable :)
Piotr Gawryś
@Avasil
one day it will be merged and easier to find :)
James Phillips
@jdrphillips
@oleg-py Thanks for your reply above!
Oleg Pyzhcov
@oleg-py
yw
Phong Nguyen
@phongngtuan
thanks @oleg-py @Avasil :)
Michael Zuber
@mgzuber

Hi, wondering if someone could clarify some best practice advice to me.

I have a queue: scala.collection.mutable.Queue[T] that gets populated from some external source. I would like to create an Observable[T] from this queue that dequeues elements from the queue if they are there.

I'm able to quite easily do this by:

def dequeue: Task[Option[T]] = Task { if (queue.nonEmpty) Option(queue.dequeue) else Option.empty }
val obs = Observable.repeatEvalF(dequeue).filter(_.isDefined).map(_.get)

Would this be the recommended way of creating the Observable? My fear is that it might be inefficient due to constant polling of the queue for values.

Oleg Pyzhcov
@oleg-py
Yes, I'd expect this to poll constantly, potentially taking away whole thread
My preference would be to replace the Queue with a Subject
The second best is to dequeue everything, but execute a Task every once in a while
Michael Zuber
@mgzuber
Replacing Queue with Subject still has the problem of having to create an Observable