Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Nathaniel Fischer
@kag0
So how would one rewrite the above test to use Tasks rather than Futures?
Is it as simple as swapping all the Locals for TaskLocals? Because that wouldn't be too bad to start with Future and migrate later.
Piotr Gawryƛ
@Avasil
Yes, and if you call the Task separately, let's say runToFuture instead of Task.parZipthen you get outer isolate for free
in many cases it's probably all you need if you want to set sth like correlationId
Nathaniel Fischer
@kag0
Yes, that's exactly what I need to set 🙄 There are very few other cross cutting concerns I'd want to put in something like a Local, but when you need it nothing's better.
I'm interested to watch how it evolves in 3.x. It would be great if local context propagation made its way into cats, but I don't know how likely that really is.
Piotr Gawryƛ
@Avasil
I feel like many proficient cats users/maintainers prefer to use Reader / ApplicativeLocal for tracing so there is not enough demand to incorporate it into c.e.IO directly. I think @oleg-py who came up with our current Local model was thinking about supporting IO / F[_] without any support there so who knows, maybe it will happen one day
Nathaniel Fischer
@kag0
That makes sense. For me (not a super heavy cats user) I'd rather log a warning if a correlation id is missing on the thread than add a new type everywhere. Tracing just doesn't warrant the rigor.
Joseph Denman
@JosephDenman

Hi all,

My team is under the impression that (using the default global scheduler) calls to parTraverse make use of "green threads" and I'd like to evaluate the validity of that claim. I've seen elsewhere that JVM doesn't support green threads in the sense that JVM can't spawn hundreds of thousands of micro-threads that will magically make over-parallelized code more efficient. Is there a preferred resource for describing Task's relationship to green threading? What is Fiber's role?

toxicafunk
@toxicafunk
generally speaking, a Fiber is a green thread, but afaik monix does not implement them
Fabio Labella
@SystemFw
that's not true :)

. I've seen elsewhere that JVM doesn't support green threads in the sense that JVM can't spawn hundreds of thousands of micro-threads that will magically make over-parallelized code more efficient

You are confusing concurrency and parallelism here. Threads and green threads are primarily about concurrency, which is a related but separate thing from parallelism

. I've seen elsewhere that JVM doesn't support green threads I

The JVM doesn't support green threads in the sense that when you spawn a JVM thread (the Thread class), it maps one to one with an operating system thread

since operating system threads have a high cost of switching between them, you can only have so many of them, they are a scarce resource
in turn, this means that if you block a JVM thread (Thread) waiting for a result (e.g. an HTTP call), there is only so many you can block before you exhaust them all
green threads otoh are more lightweight, so you can spawn many more, and they all run using a small number of JVM threads
or, if you run on Javascript, only one thread
libraries that interact with cats-effect, so cats-effect itself, fs2 or Monix, all support this model of concurrency, which is represented by the Fiber interface
toxicafunk
@toxicafunk
oh, u get it through cats-effect :-D
Fabio Labella
@SystemFw
well, in the case of Monix is not like you get it from cats-effect, Monix.Task has it and it conforms to the cats-effect interface
Ryan Peters
@sloshy
It can be implemented differently at the library level - what's required of projects like Monix is that they merely implement the same typeclasses if you want interop, and maybe some conversion methods between other types where appropriate
Fabio Labella
@SystemFw
cats-effect IO also has it, with a very similar implementation, since Alex very very heavily contributed to it
toxicafunk
@toxicafunk
I remember that much
but wasn't sure on the Fiber part
nice to know :thumbsup:
Fabio Labella
@SystemFw
feel free to ask more questions if you have further doubts
Joseph Denman
@JosephDenman

Thanks for your help

You are confusing concurrency and parallelism here. Threads and green threads are primarily about concurrency, which is a related but separate thing from parallelism

I don't think I am. Single threaded systems can be concurrent.

libraries that interact with cats-effect, so cats-effect itself, fs2 or Monix, all support this model of concurrency, which is represented by the Fiber interface

So, can you definitively claim that usages of parTraverse (and similar methods) do not make use of green threading because they do not use the Fiber interface?

Sorry, I'll be more specific.
*usages of parTraverse defined in the Parallel[Task, Task.Par] instance
Fabio Labella
@SystemFw

I don't think I am. Single threaded systems can be concurrent.

ofc they can (I mention it at the end of my answer with JS), I'm referring to the part when you talk about efficiency of parallel computations

So, can you definitively claim that usages of parTraverse (and similar methods) do not make use of green threading because they do not use the Fiber interface?

parTraverse does use green threading. I can be more specific, I was only giving an intro

Ryan Peters
@sloshy
When you run parTraverse it does not return a fiber directly as it runs all tasks to completion. You would get a fiber if you started the tasks independently though.
Joseph Denman
@JosephDenman
@SystemFw Please be more specific :)
Fabio Labella
@SystemFw
tl;dr a single Task has a runloop that executes it. Each of these running loops is a "green thread", in the sense that many of them can be multiplexed on the same thread pool (so, N: M threading, where M << N and can be even 1). Fiber is a type of handle over a runloop that let's you interrupt it or wait asynchronously for its completion. start: F[Fiber[F, A]] is the cats-effect method from the Concurrent typeclass that describes spawning one of these runloops. There are other, higher-level ways of tapping into this functionality. parTraverse is one of them
monix.Task participates in these abstractions, plus it has some other methods and implementation details which are unique to it and cats.effect.IO does not have
I'm giving a talk on this soon, going down to how things work fairly low level. It will focus on cats.effect.IO but 99% percent of it will apply to Monix too since Alex (Nedelcu), which is the main author of Monix, also contributed the bulk of this mechanism to cats-effect
Joseph Denman
@JosephDenman
@SystemFw Thanks very much!
ruslan
@unoexperto
Guys, what is analogue of akka's Source.unfold in Observable ? fromStateAction looks close but there is no stopping condition (although I can stop using takeWhile I guess)
ruslan
@unoexperto
And how do I throttle items coming from Observable ? I tried debounce, throttleLast, throttleFirst and sample but they finish stream prematurely.
Piotr Gawryƛ
@Avasil

Guys, what is analogue of akka's Source.unfold in Observable ? fromStateAction looks close but there is no stopping condition (although I can stop using takeWhile I guess)

That seems the way to do it. Having Observable.unfold might be still useful since it's quite known builder so I created an issue for it

And how do I throttle items coming from Observable ? I tried debounce, throttleLast, throttleFirst and sample but they finish stream prematurely.

Doesn't sound right, what's your Observable?

ruslan
@unoexperto
@Avasil Observable.fromIterable(0 to 10).sample(1 second). I run it on same thread executor though.
Piotr Gawryƛ
@Avasil
That's expected, your elements are emitted right away and sample will choose the latest in given time
You can try
Observable
    .fromIterable(0 to 10)
    .delayOnNext(200.millis)
    .sample(1.second)
What's your use case?
Andy Polack
@ALPSMAC

So I'm a little stuck on the usage of Observable. Is there any state that we should see something like this:

val l = List(1,2)
println(l.length)
val observable = Observable.fromIterable(l).flatMap{ i => 
  println("Inside FlatMap")
  Observable(i + 1) 
}
val task = observable.consumeWith(Consumer.foreach(_ => ()))
task.runToFuture

produce something other than something like this output?:

2
Inside FlatMap
Inside FlatMap
ruslan
@unoexperto
@Avasil I want to throttle emitting items by X items per Y time.
Andy Polack
@ALPSMAC

Currently I have:

      logger.debug("Keys Length: "+keys.length)
      Observable.fromIterable(keys).flatMap{ task =>
        logger.debug("AAAA")
        Observable.fromTask(task)
      }

where keys is of type List[Task[T]]. I see that keys.length is 2, but I only see "AAAA" printed once.

I have a feeling something perhaps further downstream is stalling processing... but I'm not quite sure how to verify that.

Piotr Gawryƛ
@Avasil

@unoexperto I think you're looking for

Observable
  .fromIterable(0 to 10)
  .bufferTimedWithPressure(1.second, 2)
  .flatMap(Observable.fromIterable)

How does it look like in Akka Streams?

Oh, it's just called throttle
ruslan
@unoexperto
@Avasil Thanks! In akka it's .throttle(number_of_items, FiniteDuration).
@Avasil It's my second attempt to use monix and I suspect it will be the last. Not sure why but monix requires a lot of mental effort even for basic stuff - names of functions are weird, behavior is weird, documentation is scarce. Just honest feedback, guys.
@Avasil To figure out throttling I tried to find in documentation first, then googled, then looked at tests, then came here. 2 hrs for obvious functionality.
On the other hand cats-effect (where author of monix contributes) is absolutely intuitive to me.
Piotr Gawryƛ
@Avasil
That's fair, I'l add this one to the documentation