Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Sergey Torgashov
@satorg
Hi there. Could someone explain me, please, what is the difference between Task.map2 and Task.parMap2. The documentation states that the former is sequential, although the latter is parallel. But it seems, that the implementation of these two are the same except a subtle difference: Task.parMap2 just calls Task.mapBoth directly, although Task.map2 calls non-static (but final anyway) Task.zipMap first, which, in turn, calls that Task.mapBoth. So, in my understanding, they both should behave in the same way. What does make one of them sequential and the other one parallel?
Yifan Xing
@xingyif
Hi @/all, I am creating a GH repo that explains how to organize a scalabridge workshop (so that many people can do so in different parts of the world). Scalabridge is a workshop that teaches underrepresented students the basics of programming in Scala using Creative Scala tutorial. It is a great way to grow the community. I am organizing a list of mentors (based on location) who are interested in helping out, so that a new organizer has mentors to reach out to when organizing a workshop. If you're interested in helping out as a scalabridge mentor, please create a PR and put your info in this file: https://github.com/xingyif/scalabridge/blob/master/organize-a-workshop/invite-mentors.md#interested--mentors
Thanks
Raas Ahsan
@RaasAhsan
@satorg It seems like that was a bug. If you check upstream, it has been corrected to execute sequentially https://github.com/monix/monix/blob/master/monix-eval/shared/src/main/scala/monix/eval/Task.scala#L3093
won’t be available in 2.x series though probably
Sergey Torgashov
@satorg
@RaasAhsan oh, indeed, it was fixed on Apr 25. Thanks for pointing it out.
Alexandru Nedelcu
@alexandru
@satorg @RaasAhsan If any of you are interested in patching the 2.x series, do a PR for this branch: https://github.com/monix/monix/tree/series/2.x — and I'll publish a patch release afterwards.
sherwinschiu
@sherwinschiu
Hi, I'm trying to setup an app that will consume a Kafka stream and produce to a Kafka stream with some processing in the middle which is currently built using Tasks. What's the recommended way to set this up? I'm currently trying to use monix-kafka's KafkaConsumerObservable and in the process of building out an Observer which I believe means eventually I need convert the ConsumerRecord into my data type and kick off my processing within onNext(). Does that mean I need to run the Task within onNext()? Is consumeWith a better option? And would that mean I need to handle polling?
Raas Ahsan
@RaasAhsan
@alexandru Sure, might as well for fun :)
Raas Ahsan
@RaasAhsan
actually, won’t be necessary i think, seems like the incorrect definition of map2 was introduced after 2.x
Piotr Gawryś
@Avasil
@sherwinschiu maybe I'm missing some details but it sounds like you could just mapTask on KafkaConsumerObservable to do your processing and consumeWith KafkaProducerSink at the end
sherwinschiu
@sherwinschiu
Thanks @Avasil - what takes care of the polling?
SemanticBeeng
@SemanticBeeng
@yanana did you get anywhere with this https://gitter.im/monix/monix?at=58eb038008c00c092a65d6b7 , please ? Tasks that are not yet attached to execution contexts should be serializable out of the box, no?
Alexandru Nedelcu
@alexandru
@SemanticBeeng Task implements Serializable, but it depends on what you captured with it, as Task is basically a function.
Guess you should try it and see if it breaks or not 🙂 serialization in Java is unfortunately a dark art of sorts. I avoid it as much as possible, but guessing with Spark it is inevitable.
SemanticBeeng
@SemanticBeeng
Yes, spark does what it does. But it is quite useful to combine with monix, intuition suggests, because we can mix more complex functional business logic and go beyond the spark way of thinking.
Piotr Gawryś
@Avasil
@sherwinschiu polling is done internally
James Phillips
@jdrphillips
Is there a way to create an Observable that waits for input? I imagine if you're streaming an Observable from a file and there's a delay on disk IO, it will patiently wait before getting the next bits and processing them. Is there a way to get the same behaviour waiting for input from my program?
So anything subscribed to the observable will do nothing until an element is fed to it
Oleg Pyzhcov
@oleg-py
That's the default behavior. You might want to look into ConcurrentSubject to feed elements in imperative fashion
James Phillips
@jdrphillips
Thank you. I've been reading the Observable API and it didn't seem to do what I wanted there. I'll read about concurrent subjects
Ryan Zeigler
@rzeigler
is there any difference between gather and parSequence or is one just the cats.Parallel name for the operatino?
Piotr Gawryś
@Avasil
@rzeigler gather is optimized version just for Task and not every Traverse
I'd recommend using Task versions whenever possible, there are also for traverse, parTraverse and normal sequence + unordered variants
Ryan Zeigler
@rzeigler
is there a Task.parTraverse or is that just map(f).gather?
Piotr Gawryś
@Avasil
wander / wanderUnordered
sherwinschiu
@sherwinschiu
Hi. I ran into the following NoSuchMethodError when running KafkaConsumerObservable.mapTask(???).consumerWith(???).runAsync ->
java.lang.NoSuchMethodError: monix.eval.Task$Context.frameRef()Lmonix/eval/Task$FrameIndexRef; at monix.kafka.KafkaConsumerObservable.$anonfun$feedTask$4(KafkaConsumerObservable.scala:90) at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:140) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157).
I'm using monix 3.0.0-RC2-d0feeba, monix-kafka 1.0.0-RC1, and cats 1.0.0. Does this mean that monix-kafka does not have a compatible version with monix 3.0.0-RC2 yet?
Gabriel Volpe
@gvolpe
@sherwinschiu do you have cats-effect in your classpath too?
check the evictions but that seems to be a binary compatibility issue
Definitely for what I can see here: https://github.com/monix/monix-kafka/releases/tag/v1.0.0-RC1 monix-kafka is built on monix RC1 which is not binary compatible with monix RC2-hash and cats-effect-1.0.0
There's no monix-3.0.0-RC2 yet, just a hash build so I suspect monix-kafka will release a new version when monix is released
Mateusz Górski
@goral09
@sherwinschiu when I see NoSuchMethodError first thing I do is checking the dependencies and bin comp.
James Phillips
@jdrphillips

I have a quick question about Subject[I, O]. All the examples and factory methods have I = O. Also, the unimplemented methods are:

Subscriber[O] => Cancelable (Observable[O])

and

I => Future[Ack] (Observer[I])

There is no way to go from I to O, so it seems Subject can never be used unless I = O. So why is it parameterised by different types?

Oleg Pyzhcov
@oleg-py
@jdrphillips this will be made possible if #690 is done :)
You can now have a subject of different I and O by abusing covariance, but there's no practical application that I know of
Vasily Kirichenko
@vasily-kirichenko
I need to process tasks in parallel, by a preconfigured number of (stateful) workers. Consumer.loadBalance looks like a perfect fit, but the workers work like a function, not like a final consumers, so they cannot be at the end of the stream. Ideas?
Vasily Kirichenko
@vasily-kirichenko
With Consumer.loadBalance it looks like this:
val consumers =
    List(("acc1", 500.millis), ("acc2", 300.millis))
        .map {
            case (account, delay) =>
                Consumer.foreachTask((task: Long) =>
                    Task.sleep(delay).map(_ => println(s"[$account] processed $task")))
        }

val finished =
    Observable.range(0, 6)
        .dump("source")
        .consumeWith(Consumer.loadBalance(consumers: _*))

Await.result(finished.runAsync, 5.seconds)
But consumers provide results, which should be processed further by downstream, which seems impossible with ^^ approach.
Vasily Kirichenko
@vasily-kirichenko
It works, but I'm uncomfortable with the fact that there are actually two streams connected implicitly by a MVar.
Phong Nguyen
@phongngtuan

Hi, I have this issue where the last element on a blocking Iterator is not passed through the pipeline. I'm looking at this piece of code in IteratorAsObservable 's fastLoop

      val next = iter.next()
      iteratorHasNext = iter.hasNext
      streamErrors = false
      ack = out.onNext(next)

looks like if the 2nd line is blocking (on an infinite Iterator for example) then the onNext callback on the current element is never called? Did I miss something here? @alexandru ?
Many thanks!

Phong Nguyen
@phongngtuan
Another dumb question: if i were to implement my own Observable to use a callback, is there any contract I should conform?
Oleg Pyzhcov
@oleg-py
@phongngtuan re latest question, it's easier to just use subjects, but generally you have a backpressure/Ack ordering - so you can't call onNext until the Future from previous onNext has resolved with Continue
Phong Nguyen
@phongngtuan
@oleg-py that means in the Observable implementation I should wait for the Future[Ack] to complete? Possibly with an entrant lock?
Oleg Pyzhcov
@oleg-py
Wait - yes, and for onNext only - onComplete can be signalled concurrently, for instance.
It's easier to just use ConcurrentSubject :)
Phong Nguyen
@phongngtuan
@oleg-py How will ConcurrentSubject integrate with a callback-based Java API? The api can give me either a Java stream or a callback. I tried Observable.fromIterator but that seems to not work well with infinite iterator so I implement my own Observable instead. Anw now that you mention Subject, it seems similar to a Pipe isn’t it? I am also using pipe as I have a function Observable[A] => Observable[B] that I use with pipeThrough with unicast. Monix 2 had a transform method that is now removed but I don’t know the alternative, should I be using pipeThrough?
Oleg Pyzhcov
@oleg-py
With ConcurrentSubject, whenever the API calls you back, you just call onNext with received value and that's it.
And you can flat/map/etc. it like a regular observable