Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Piotr Gawryś
@Avasil
@crakjie The behavior is expected (it's mentioned in the scaladoc). I guess it can be confusing but I'd say it's like Observable.empty[Int] or List.empty[Int]
Nathaniel Fischer
@kag0
@Avasil so I'm trying it out, does this look right?
implicit val sched = TracingScheduler(ExecutionContext.global)
val local = Local(0)

val f1 = for {
  _ <- Future.successful("shift")
  _ <- Future.successful{
    println(Thread.currentThread.getName + s" 1- ${local()}")
    assert(local() == 0)
    local := 50
  }
  _ <- Future.successful{
    println(Thread.currentThread.getName + s" 1- ${local()}")
    assert(local() == 50)
  }
  _ <- Local.isolate(Future.successful{
    println(Thread.currentThread.getName + s" 1- ${local()}")
    assert(local() == 50)
    local := 100
  })
  _ <- Future.successful{
    println(Thread.currentThread.getName + s" 1- ${local()}")
    assert(local() == 50)
  }
} yield ()

val f2 = Local.isolate(for {
  _ <- Future.successful("shift")
  _ <- Future.successful{
    println(Thread.currentThread.getName + s" 2- ${local()}")
    assert(local() == 0)
    local := 5
  }
  _ <- Future.successful{
    println(Thread.currentThread.getName + s" 2- ${local()}")
    assert(local() == 5)
  }
  _ <- Local.isolate(Future.successful{
    println(Thread.currentThread.getName + s" 2- ${local()}")
    assert(local() == 5)
    local := 10
  })
  _ <- Future.successful{
    println(Thread.currentThread.getName + s" 2- ${local()}")
    assert(local() == 5)
  }

} yield ())

Await.result(f1, Duration.Inf)
Await.result(f2, Duration.Inf)
Piotr Gawryś
@Avasil
yeah
is that what you expect?
Nathaniel Fischer
@kag0
I think I find myself expecting the outer Local.isolate to be implied. But it makes sense when I think about it, and that could be invoked as part of a framework or something and logic code wouldn't need to worry about it.
I'm pleasantly surprised it works so cleanly.
Swapping the futures for IOs does not seem to work so well. I was hoping a IO.contextShift(TracingScheduler(ExecutionContext.global)) would do it but it seems like not.
Piotr Gawryś
@Avasil

Local.isolate won't work for lazy structures, Task uses:

    Task {
      val current = Local.getContext()
      Local.setContext(current.mkIsolated)
      current
    }.bracket(_ => task)(backup => Task(Local.setContext(backup)))

it might help a bit but there is also some logic encoded in Task run-loop itself

automatically isolating outer Future would be awesome but challenging to implement (we have to somehow detect that it is the first Future). Maybe one day x)
Nathaniel Fischer
@kag0
So how would one rewrite the above test to use Tasks rather than Futures?
Is it as simple as swapping all the Locals for TaskLocals? Because that wouldn't be too bad to start with Future and migrate later.
Piotr Gawryś
@Avasil
Yes, and if you call the Task separately, let's say runToFuture instead of Task.parZipthen you get outer isolate for free
in many cases it's probably all you need if you want to set sth like correlationId
Nathaniel Fischer
@kag0
Yes, that's exactly what I need to set 🙄 There are very few other cross cutting concerns I'd want to put in something like a Local, but when you need it nothing's better.
I'm interested to watch how it evolves in 3.x. It would be great if local context propagation made its way into cats, but I don't know how likely that really is.
Piotr Gawryś
@Avasil
I feel like many proficient cats users/maintainers prefer to use Reader / ApplicativeLocal for tracing so there is not enough demand to incorporate it into c.e.IO directly. I think @oleg-py who came up with our current Local model was thinking about supporting IO / F[_] without any support there so who knows, maybe it will happen one day
Nathaniel Fischer
@kag0
That makes sense. For me (not a super heavy cats user) I'd rather log a warning if a correlation id is missing on the thread than add a new type everywhere. Tracing just doesn't warrant the rigor.
Joseph Denman
@JosephDenman

Hi all,

My team is under the impression that (using the default global scheduler) calls to parTraverse make use of "green threads" and I'd like to evaluate the validity of that claim. I've seen elsewhere that JVM doesn't support green threads in the sense that JVM can't spawn hundreds of thousands of micro-threads that will magically make over-parallelized code more efficient. Is there a preferred resource for describing Task's relationship to green threading? What is Fiber's role?

toxicafunk
@toxicafunk
generally speaking, a Fiber is a green thread, but afaik monix does not implement them
Fabio Labella
@SystemFw
that's not true :)

. I've seen elsewhere that JVM doesn't support green threads in the sense that JVM can't spawn hundreds of thousands of micro-threads that will magically make over-parallelized code more efficient

You are confusing concurrency and parallelism here. Threads and green threads are primarily about concurrency, which is a related but separate thing from parallelism

. I've seen elsewhere that JVM doesn't support green threads I

The JVM doesn't support green threads in the sense that when you spawn a JVM thread (the Thread class), it maps one to one with an operating system thread

since operating system threads have a high cost of switching between them, you can only have so many of them, they are a scarce resource
in turn, this means that if you block a JVM thread (Thread) waiting for a result (e.g. an HTTP call), there is only so many you can block before you exhaust them all
green threads otoh are more lightweight, so you can spawn many more, and they all run using a small number of JVM threads
or, if you run on Javascript, only one thread
libraries that interact with cats-effect, so cats-effect itself, fs2 or Monix, all support this model of concurrency, which is represented by the Fiber interface
toxicafunk
@toxicafunk
oh, u get it through cats-effect :-D
Fabio Labella
@SystemFw
well, in the case of Monix is not like you get it from cats-effect, Monix.Task has it and it conforms to the cats-effect interface
Ryan Peters
@sloshy
It can be implemented differently at the library level - what's required of projects like Monix is that they merely implement the same typeclasses if you want interop, and maybe some conversion methods between other types where appropriate
Fabio Labella
@SystemFw
cats-effect IO also has it, with a very similar implementation, since Alex very very heavily contributed to it
toxicafunk
@toxicafunk
I remember that much
but wasn't sure on the Fiber part
nice to know :thumbsup:
Fabio Labella
@SystemFw
feel free to ask more questions if you have further doubts
Joseph Denman
@JosephDenman

Thanks for your help

You are confusing concurrency and parallelism here. Threads and green threads are primarily about concurrency, which is a related but separate thing from parallelism

I don't think I am. Single threaded systems can be concurrent.

libraries that interact with cats-effect, so cats-effect itself, fs2 or Monix, all support this model of concurrency, which is represented by the Fiber interface

So, can you definitively claim that usages of parTraverse (and similar methods) do not make use of green threading because they do not use the Fiber interface?

Sorry, I'll be more specific.
*usages of parTraverse defined in the Parallel[Task, Task.Par] instance
Fabio Labella
@SystemFw

I don't think I am. Single threaded systems can be concurrent.

ofc they can (I mention it at the end of my answer with JS), I'm referring to the part when you talk about efficiency of parallel computations

So, can you definitively claim that usages of parTraverse (and similar methods) do not make use of green threading because they do not use the Fiber interface?

parTraverse does use green threading. I can be more specific, I was only giving an intro

Ryan Peters
@sloshy
When you run parTraverse it does not return a fiber directly as it runs all tasks to completion. You would get a fiber if you started the tasks independently though.
Joseph Denman
@JosephDenman
@SystemFw Please be more specific :)
Fabio Labella
@SystemFw
tl;dr a single Task has a runloop that executes it. Each of these running loops is a "green thread", in the sense that many of them can be multiplexed on the same thread pool (so, N: M threading, where M << N and can be even 1). Fiber is a type of handle over a runloop that let's you interrupt it or wait asynchronously for its completion. start: F[Fiber[F, A]] is the cats-effect method from the Concurrent typeclass that describes spawning one of these runloops. There are other, higher-level ways of tapping into this functionality. parTraverse is one of them
monix.Task participates in these abstractions, plus it has some other methods and implementation details which are unique to it and cats.effect.IO does not have
I'm giving a talk on this soon, going down to how things work fairly low level. It will focus on cats.effect.IO but 99% percent of it will apply to Monix too since Alex (Nedelcu), which is the main author of Monix, also contributed the bulk of this mechanism to cats-effect
Joseph Denman
@JosephDenman
@SystemFw Thanks very much!
ruslan
@unoexperto
Guys, what is analogue of akka's Source.unfold in Observable ? fromStateAction looks close but there is no stopping condition (although I can stop using takeWhile I guess)
ruslan
@unoexperto
And how do I throttle items coming from Observable ? I tried debounce, throttleLast, throttleFirst and sample but they finish stream prematurely.
Piotr Gawryś
@Avasil

Guys, what is analogue of akka's Source.unfold in Observable ? fromStateAction looks close but there is no stopping condition (although I can stop using takeWhile I guess)

That seems the way to do it. Having Observable.unfold might be still useful since it's quite known builder so I created an issue for it

And how do I throttle items coming from Observable ? I tried debounce, throttleLast, throttleFirst and sample but they finish stream prematurely.

Doesn't sound right, what's your Observable?

ruslan
@unoexperto
@Avasil Observable.fromIterable(0 to 10).sample(1 second). I run it on same thread executor though.
Piotr Gawryś
@Avasil
That's expected, your elements are emitted right away and sample will choose the latest in given time
You can try
Observable
    .fromIterable(0 to 10)
    .delayOnNext(200.millis)
    .sample(1.second)
What's your use case?
Andy Polack
@ALPSMAC

So I'm a little stuck on the usage of Observable. Is there any state that we should see something like this:

val l = List(1,2)
println(l.length)
val observable = Observable.fromIterable(l).flatMap{ i => 
  println("Inside FlatMap")
  Observable(i + 1) 
}
val task = observable.consumeWith(Consumer.foreach(_ => ()))
task.runToFuture

produce something other than something like this output?:

2
Inside FlatMap
Inside FlatMap
ruslan
@unoexperto
@Avasil I want to throttle emitting items by X items per Y time.