Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
toxicafunk
@toxicafunk
but wasn't sure on the Fiber part
nice to know :thumbsup:
Fabio Labella
@SystemFw
feel free to ask more questions if you have further doubts
Joseph Denman
@JosephDenman

Thanks for your help

You are confusing concurrency and parallelism here. Threads and green threads are primarily about concurrency, which is a related but separate thing from parallelism

I don't think I am. Single threaded systems can be concurrent.

libraries that interact with cats-effect, so cats-effect itself, fs2 or Monix, all support this model of concurrency, which is represented by the Fiber interface

So, can you definitively claim that usages of parTraverse (and similar methods) do not make use of green threading because they do not use the Fiber interface?

Sorry, I'll be more specific.
*usages of parTraverse defined in the Parallel[Task, Task.Par] instance
Fabio Labella
@SystemFw

I don't think I am. Single threaded systems can be concurrent.

ofc they can (I mention it at the end of my answer with JS), I'm referring to the part when you talk about efficiency of parallel computations

So, can you definitively claim that usages of parTraverse (and similar methods) do not make use of green threading because they do not use the Fiber interface?

parTraverse does use green threading. I can be more specific, I was only giving an intro

Ryan Peters
@sloshy
When you run parTraverse it does not return a fiber directly as it runs all tasks to completion. You would get a fiber if you started the tasks independently though.
Joseph Denman
@JosephDenman
@SystemFw Please be more specific :)
Fabio Labella
@SystemFw
tl;dr a single Task has a runloop that executes it. Each of these running loops is a "green thread", in the sense that many of them can be multiplexed on the same thread pool (so, N: M threading, where M << N and can be even 1). Fiber is a type of handle over a runloop that let's you interrupt it or wait asynchronously for its completion. start: F[Fiber[F, A]] is the cats-effect method from the Concurrent typeclass that describes spawning one of these runloops. There are other, higher-level ways of tapping into this functionality. parTraverse is one of them
monix.Task participates in these abstractions, plus it has some other methods and implementation details which are unique to it and cats.effect.IO does not have
I'm giving a talk on this soon, going down to how things work fairly low level. It will focus on cats.effect.IO but 99% percent of it will apply to Monix too since Alex (Nedelcu), which is the main author of Monix, also contributed the bulk of this mechanism to cats-effect
Joseph Denman
@JosephDenman
@SystemFw Thanks very much!
ruslan
@unoexperto
Guys, what is analogue of akka's Source.unfold in Observable ? fromStateAction looks close but there is no stopping condition (although I can stop using takeWhile I guess)
ruslan
@unoexperto
And how do I throttle items coming from Observable ? I tried debounce, throttleLast, throttleFirst and sample but they finish stream prematurely.
Piotr Gawryś
@Avasil

Guys, what is analogue of akka's Source.unfold in Observable ? fromStateAction looks close but there is no stopping condition (although I can stop using takeWhile I guess)

That seems the way to do it. Having Observable.unfold might be still useful since it's quite known builder so I created an issue for it

And how do I throttle items coming from Observable ? I tried debounce, throttleLast, throttleFirst and sample but they finish stream prematurely.

Doesn't sound right, what's your Observable?

ruslan
@unoexperto
@Avasil Observable.fromIterable(0 to 10).sample(1 second). I run it on same thread executor though.
Piotr Gawryś
@Avasil
That's expected, your elements are emitted right away and sample will choose the latest in given time
You can try
Observable
    .fromIterable(0 to 10)
    .delayOnNext(200.millis)
    .sample(1.second)
What's your use case?
Andy Polack
@ALPSMAC

So I'm a little stuck on the usage of Observable. Is there any state that we should see something like this:

val l = List(1,2)
println(l.length)
val observable = Observable.fromIterable(l).flatMap{ i => 
  println("Inside FlatMap")
  Observable(i + 1) 
}
val task = observable.consumeWith(Consumer.foreach(_ => ()))
task.runToFuture

produce something other than something like this output?:

2
Inside FlatMap
Inside FlatMap
ruslan
@unoexperto
@Avasil I want to throttle emitting items by X items per Y time.
Andy Polack
@ALPSMAC

Currently I have:

      logger.debug("Keys Length: "+keys.length)
      Observable.fromIterable(keys).flatMap{ task =>
        logger.debug("AAAA")
        Observable.fromTask(task)
      }

where keys is of type List[Task[T]]. I see that keys.length is 2, but I only see "AAAA" printed once.

I have a feeling something perhaps further downstream is stalling processing... but I'm not quite sure how to verify that.

Piotr Gawryś
@Avasil

@unoexperto I think you're looking for

Observable
  .fromIterable(0 to 10)
  .bufferTimedWithPressure(1.second, 2)
  .flatMap(Observable.fromIterable)

How does it look like in Akka Streams?

Oh, it's just called throttle
ruslan
@unoexperto
@Avasil Thanks! In akka it's .throttle(number_of_items, FiniteDuration).
@Avasil It's my second attempt to use monix and I suspect it will be the last. Not sure why but monix requires a lot of mental effort even for basic stuff - names of functions are weird, behavior is weird, documentation is scarce. Just honest feedback, guys.
@Avasil To figure out throttling I tried to find in documentation first, then googled, then looked at tests, then came here. 2 hrs for obvious functionality.
On the other hand cats-effect (where author of monix contributes) is absolutely intuitive to me.
Piotr Gawryś
@Avasil
That's fair, I'l add this one to the documentation
Most of the names come from ReactiveX though
ruslan
@unoexperto
@Avasil Frankly I don't think it will help in long run with adoption. I think the best thing would be to have one-to-one mapping in documentation to every function of akka's Flow, Source and Sink.
toxicafunk
@toxicafunk
I find reactive X madness more useful though, since I was doing some in Java
Piotr Gawryś
@Avasil
yeah, that's arguable because it depends on someone's background
ruslan
@unoexperto
@toxicafunk Agreed. Monix might be friendly to those who came from vanilla Java.
Piotr Gawryś
@Avasil
e.g. meaning of throttle is different than the one in Akka Streams
ruslan
@unoexperto
If goal of monix author is to get Java's market then fine.
@Avasil how about names like debounce, throttleLast, throttleFirst and sample? :) It's hard to say from the name what these functions really do.
Piotr Gawryś
@Avasil
I agree but in case of Observable which is heavily inspired by ReactiveX (very similar model but with more FP flavor) there is a lot of value in being consistent. Huge part of the API that could have confusing docs will most likely be well documented elsewhere, e.g. http://reactivex.io/documentation/operators/sample.html
I also don't have idea for better names for many of these :D
ruslan
@unoexperto
@Avasil Good point on being inspired by ReactiveX. Although newcomers to monix may not figure out that they need to go elsewhere to understand how to use monix.
Piotr Gawryś
@Avasil
It's a good idea to point that somewhere at the top of docs
You don't have to know it (I've never ever used rx), those are just extra resources
Anyway, I'm always happy to receive feedback so if you end up giving Monix Observable a try, I'll be glad to help with any confusing names/descriptions and possibly improve it in the process, one by one
Piotr Gawryś
@Avasil
@ALPSMAC It should log 1 time per emitted element so yes, it sounds like something is delaying it downstream. I assume only 1 task is executed?
Lorenzo Gabriele
@lolgab
Hi, don't know if it is the right channel, but I made a PR to update minitest to latest Scala Native (removing some no more necessary stuff like custom definitions of Future, Promise, Await and ExecutionContext). It would be nice a review of it and a release of the Scala Native 0.4.0-M2 version ( I added the split between the two versions using an environment variable (SCALANATIVE_VERSION="0.4.0-M2" sbt to use the new one)
Piotr Gawryś
@Avasil
cc @alexandru but he's not very active lately
Lorenzo Gabriele
@lolgab
Thank you for answering. Hope he can give a look. It is a really straightforward PR. And the CI passes.
Mark
@mmgalea_twitter

Hi there, I'm reading the documentation, https://monix.io/docs/3x/best-practices/blocking.html

implicit val ec = ExecutionContext
  .fromExecutor(Executors.newFixedThreadPool(1))

def addOne(x: Int) = Future(x + 1)

def multiply(x: Int, y: Int) = Future {
  val a = addOne(x)
  val b = addOne(y)
  val result = for (r1 <- a; r2 <- b) yield r1 * r2

  // This can dead-lock due to the limited size 
  // of our thread-pool!
  Await.result(result, Duration.Inf)
}

Not sure how this can dead-lock, can someone explain please.

Andy Polack
@ALPSMAC
@Avasil - thanks for confirming my intuition... 13 hours of work yesterday for me and I needed another pair of eyes to confirm... that gives me a place to start interrogating this morning. I appreciate the assist.
Yep - only one task is executed. Downstream from that I flatMap to an Observable that's attached to a Server Sent Event endpoint (converting via the Reactive Publisher spec. from Akka Streams Sink to a Monix Observable. I wonder if somehow the SSE is delaying the completion of the stream so that no further downstream elements are processed. I'll look into that today.
Loránd Szakács
@lorandszakacs
@mmgalea_twitter, the Future from multiply is already taking up one thread (i.e. all of your threads), it tries to start two new Futures with the calls to addOne, which never get to be executed, because they're spawned while your only thread is blocked. Then you proceed to completely block that single thread you have with the Await, never actually being able to execute the ones spawned by addOne