Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Raas Ahsan
@RaasAhsan
actually, won’t be necessary i think, seems like the incorrect definition of map2 was introduced after 2.x
Piotr Gawryś
@Avasil
@sherwinschiu maybe I'm missing some details but it sounds like you could just mapTask on KafkaConsumerObservable to do your processing and consumeWith KafkaProducerSink at the end
sherwinschiu
@sherwinschiu
Thanks @Avasil - what takes care of the polling?
SemanticBeeng
@SemanticBeeng
@yanana did you get anywhere with this https://gitter.im/monix/monix?at=58eb038008c00c092a65d6b7 , please ? Tasks that are not yet attached to execution contexts should be serializable out of the box, no?
Alexandru Nedelcu
@alexandru
@SemanticBeeng Task implements Serializable, but it depends on what you captured with it, as Task is basically a function.
Guess you should try it and see if it breaks or not 🙂 serialization in Java is unfortunately a dark art of sorts. I avoid it as much as possible, but guessing with Spark it is inevitable.
SemanticBeeng
@SemanticBeeng
Yes, spark does what it does. But it is quite useful to combine with monix, intuition suggests, because we can mix more complex functional business logic and go beyond the spark way of thinking.
Piotr Gawryś
@Avasil
@sherwinschiu polling is done internally
James Phillips
@jdrphillips
Is there a way to create an Observable that waits for input? I imagine if you're streaming an Observable from a file and there's a delay on disk IO, it will patiently wait before getting the next bits and processing them. Is there a way to get the same behaviour waiting for input from my program?
So anything subscribed to the observable will do nothing until an element is fed to it
Oleg Pyzhcov
@oleg-py
That's the default behavior. You might want to look into ConcurrentSubject to feed elements in imperative fashion
James Phillips
@jdrphillips
Thank you. I've been reading the Observable API and it didn't seem to do what I wanted there. I'll read about concurrent subjects
Ryan Zeigler
@rzeigler
is there any difference between gather and parSequence or is one just the cats.Parallel name for the operatino?
Piotr Gawryś
@Avasil
@rzeigler gather is optimized version just for Task and not every Traverse
I'd recommend using Task versions whenever possible, there are also for traverse, parTraverse and normal sequence + unordered variants
Ryan Zeigler
@rzeigler
is there a Task.parTraverse or is that just map(f).gather?
Piotr Gawryś
@Avasil
wander / wanderUnordered
sherwinschiu
@sherwinschiu
Hi. I ran into the following NoSuchMethodError when running KafkaConsumerObservable.mapTask(???).consumerWith(???).runAsync ->
java.lang.NoSuchMethodError: monix.eval.Task$Context.frameRef()Lmonix/eval/Task$FrameIndexRef; at monix.kafka.KafkaConsumerObservable.$anonfun$feedTask$4(KafkaConsumerObservable.scala:90) at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:140) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157).
I'm using monix 3.0.0-RC2-d0feeba, monix-kafka 1.0.0-RC1, and cats 1.0.0. Does this mean that monix-kafka does not have a compatible version with monix 3.0.0-RC2 yet?
Gabriel Volpe
@gvolpe
@sherwinschiu do you have cats-effect in your classpath too?
check the evictions but that seems to be a binary compatibility issue
Definitely for what I can see here: https://github.com/monix/monix-kafka/releases/tag/v1.0.0-RC1 monix-kafka is built on monix RC1 which is not binary compatible with monix RC2-hash and cats-effect-1.0.0
There's no monix-3.0.0-RC2 yet, just a hash build so I suspect monix-kafka will release a new version when monix is released
Mateusz Górski
@goral09
@sherwinschiu when I see NoSuchMethodError first thing I do is checking the dependencies and bin comp.
James Phillips
@jdrphillips

I have a quick question about Subject[I, O]. All the examples and factory methods have I = O. Also, the unimplemented methods are:

Subscriber[O] => Cancelable (Observable[O])

and

I => Future[Ack] (Observer[I])

There is no way to go from I to O, so it seems Subject can never be used unless I = O. So why is it parameterised by different types?

Oleg Pyzhcov
@oleg-py
@jdrphillips this will be made possible if #690 is done :)
You can now have a subject of different I and O by abusing covariance, but there's no practical application that I know of
Vasily Kirichenko
@vasily-kirichenko
I need to process tasks in parallel, by a preconfigured number of (stateful) workers. Consumer.loadBalance looks like a perfect fit, but the workers work like a function, not like a final consumers, so they cannot be at the end of the stream. Ideas?
Vasily Kirichenko
@vasily-kirichenko
With Consumer.loadBalance it looks like this:
val consumers =
    List(("acc1", 500.millis), ("acc2", 300.millis))
        .map {
            case (account, delay) =>
                Consumer.foreachTask((task: Long) =>
                    Task.sleep(delay).map(_ => println(s"[$account] processed $task")))
        }

val finished =
    Observable.range(0, 6)
        .dump("source")
        .consumeWith(Consumer.loadBalance(consumers: _*))

Await.result(finished.runAsync, 5.seconds)
But consumers provide results, which should be processed further by downstream, which seems impossible with ^^ approach.
Vasily Kirichenko
@vasily-kirichenko
It works, but I'm uncomfortable with the fact that there are actually two streams connected implicitly by a MVar.
Phong Nguyen
@phongngtuan

Hi, I have this issue where the last element on a blocking Iterator is not passed through the pipeline. I'm looking at this piece of code in IteratorAsObservable 's fastLoop

      val next = iter.next()
      iteratorHasNext = iter.hasNext
      streamErrors = false
      ack = out.onNext(next)

looks like if the 2nd line is blocking (on an infinite Iterator for example) then the onNext callback on the current element is never called? Did I miss something here? @alexandru ?
Many thanks!

Phong Nguyen
@phongngtuan
Another dumb question: if i were to implement my own Observable to use a callback, is there any contract I should conform?
Oleg Pyzhcov
@oleg-py
@phongngtuan re latest question, it's easier to just use subjects, but generally you have a backpressure/Ack ordering - so you can't call onNext until the Future from previous onNext has resolved with Continue
Phong Nguyen
@phongngtuan
@oleg-py that means in the Observable implementation I should wait for the Future[Ack] to complete? Possibly with an entrant lock?
Oleg Pyzhcov
@oleg-py
Wait - yes, and for onNext only - onComplete can be signalled concurrently, for instance.
It's easier to just use ConcurrentSubject :)
Phong Nguyen
@phongngtuan
@oleg-py How will ConcurrentSubject integrate with a callback-based Java API? The api can give me either a Java stream or a callback. I tried Observable.fromIterator but that seems to not work well with infinite iterator so I implement my own Observable instead. Anw now that you mention Subject, it seems similar to a Pipe isn’t it? I am also using pipe as I have a function Observable[A] => Observable[B] that I use with pipeThrough with unicast. Monix 2 had a transform method that is now removed but I don’t know the alternative, should I be using pipeThrough?
Oleg Pyzhcov
@oleg-py
With ConcurrentSubject, whenever the API calls you back, you just call onNext with received value and that's it.
And you can flat/map/etc. it like a regular observable
It is a way to create observables by imperatively feeding events into it. Not really much in common with pipes
Phong Nguyen
@phongngtuan
@oleg-py re: concurrent object. Got it, so I should construct a Subject and consume from it. I’m reading the doc but it’s not obvious how to construct one
Phong Nguyen
@phongngtuan
Re: pipe and subject. Subject can subscribe to a Observable and emits downstream, that looks like a pipe to me
Oleg Pyzhcov
@oleg-py
I've never subscribed a subject to an observable :)
Piotr Gawryś
@Avasil
one day it will be merged and easier to find :)
James Phillips
@jdrphillips
@oleg-py Thanks for your reply above!
Oleg Pyzhcov
@oleg-py
yw