Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Joseph Denman
@JosephDenman
@SystemFw Please be more specific :)
Fabio Labella
@SystemFw
tl;dr a single Task has a runloop that executes it. Each of these running loops is a "green thread", in the sense that many of them can be multiplexed on the same thread pool (so, N: M threading, where M << N and can be even 1). Fiber is a type of handle over a runloop that let's you interrupt it or wait asynchronously for its completion. start: F[Fiber[F, A]] is the cats-effect method from the Concurrent typeclass that describes spawning one of these runloops. There are other, higher-level ways of tapping into this functionality. parTraverse is one of them
monix.Task participates in these abstractions, plus it has some other methods and implementation details which are unique to it and cats.effect.IO does not have
I'm giving a talk on this soon, going down to how things work fairly low level. It will focus on cats.effect.IO but 99% percent of it will apply to Monix too since Alex (Nedelcu), which is the main author of Monix, also contributed the bulk of this mechanism to cats-effect
Joseph Denman
@JosephDenman
@SystemFw Thanks very much!
ruslan
@unoexperto
Guys, what is analogue of akka's Source.unfold in Observable ? fromStateAction looks close but there is no stopping condition (although I can stop using takeWhile I guess)
ruslan
@unoexperto
And how do I throttle items coming from Observable ? I tried debounce, throttleLast, throttleFirst and sample but they finish stream prematurely.
Piotr Gawryś
@Avasil

Guys, what is analogue of akka's Source.unfold in Observable ? fromStateAction looks close but there is no stopping condition (although I can stop using takeWhile I guess)

That seems the way to do it. Having Observable.unfold might be still useful since it's quite known builder so I created an issue for it

And how do I throttle items coming from Observable ? I tried debounce, throttleLast, throttleFirst and sample but they finish stream prematurely.

Doesn't sound right, what's your Observable?

ruslan
@unoexperto
@Avasil Observable.fromIterable(0 to 10).sample(1 second). I run it on same thread executor though.
Piotr Gawryś
@Avasil
That's expected, your elements are emitted right away and sample will choose the latest in given time
You can try
Observable
    .fromIterable(0 to 10)
    .delayOnNext(200.millis)
    .sample(1.second)
What's your use case?
Andy Polack
@ALPSMAC

So I'm a little stuck on the usage of Observable. Is there any state that we should see something like this:

val l = List(1,2)
println(l.length)
val observable = Observable.fromIterable(l).flatMap{ i => 
  println("Inside FlatMap")
  Observable(i + 1) 
}
val task = observable.consumeWith(Consumer.foreach(_ => ()))
task.runToFuture

produce something other than something like this output?:

2
Inside FlatMap
Inside FlatMap
ruslan
@unoexperto
@Avasil I want to throttle emitting items by X items per Y time.
Andy Polack
@ALPSMAC

Currently I have:

      logger.debug("Keys Length: "+keys.length)
      Observable.fromIterable(keys).flatMap{ task =>
        logger.debug("AAAA")
        Observable.fromTask(task)
      }

where keys is of type List[Task[T]]. I see that keys.length is 2, but I only see "AAAA" printed once.

I have a feeling something perhaps further downstream is stalling processing... but I'm not quite sure how to verify that.

Piotr Gawryś
@Avasil

@unoexperto I think you're looking for

Observable
  .fromIterable(0 to 10)
  .bufferTimedWithPressure(1.second, 2)
  .flatMap(Observable.fromIterable)

How does it look like in Akka Streams?

Oh, it's just called throttle
ruslan
@unoexperto
@Avasil Thanks! In akka it's .throttle(number_of_items, FiniteDuration).
@Avasil It's my second attempt to use monix and I suspect it will be the last. Not sure why but monix requires a lot of mental effort even for basic stuff - names of functions are weird, behavior is weird, documentation is scarce. Just honest feedback, guys.
@Avasil To figure out throttling I tried to find in documentation first, then googled, then looked at tests, then came here. 2 hrs for obvious functionality.
On the other hand cats-effect (where author of monix contributes) is absolutely intuitive to me.
Piotr Gawryś
@Avasil
That's fair, I'l add this one to the documentation
Most of the names come from ReactiveX though
ruslan
@unoexperto
@Avasil Frankly I don't think it will help in long run with adoption. I think the best thing would be to have one-to-one mapping in documentation to every function of akka's Flow, Source and Sink.
toxicafunk
@toxicafunk
I find reactive X madness more useful though, since I was doing some in Java
Piotr Gawryś
@Avasil
yeah, that's arguable because it depends on someone's background
ruslan
@unoexperto
@toxicafunk Agreed. Monix might be friendly to those who came from vanilla Java.
Piotr Gawryś
@Avasil
e.g. meaning of throttle is different than the one in Akka Streams
ruslan
@unoexperto
If goal of monix author is to get Java's market then fine.
@Avasil how about names like debounce, throttleLast, throttleFirst and sample? :) It's hard to say from the name what these functions really do.
Piotr Gawryś
@Avasil
I agree but in case of Observable which is heavily inspired by ReactiveX (very similar model but with more FP flavor) there is a lot of value in being consistent. Huge part of the API that could have confusing docs will most likely be well documented elsewhere, e.g. http://reactivex.io/documentation/operators/sample.html
I also don't have idea for better names for many of these :D
ruslan
@unoexperto
@Avasil Good point on being inspired by ReactiveX. Although newcomers to monix may not figure out that they need to go elsewhere to understand how to use monix.
Piotr Gawryś
@Avasil
It's a good idea to point that somewhere at the top of docs
You don't have to know it (I've never ever used rx), those are just extra resources
Anyway, I'm always happy to receive feedback so if you end up giving Monix Observable a try, I'll be glad to help with any confusing names/descriptions and possibly improve it in the process, one by one
Piotr Gawryś
@Avasil
@ALPSMAC It should log 1 time per emitted element so yes, it sounds like something is delaying it downstream. I assume only 1 task is executed?
Lorenzo Gabriele
@lolgab
Hi, don't know if it is the right channel, but I made a PR to update minitest to latest Scala Native (removing some no more necessary stuff like custom definitions of Future, Promise, Await and ExecutionContext). It would be nice a review of it and a release of the Scala Native 0.4.0-M2 version ( I added the split between the two versions using an environment variable (SCALANATIVE_VERSION="0.4.0-M2" sbt to use the new one)
Piotr Gawryś
@Avasil
cc @alexandru but he's not very active lately
Lorenzo Gabriele
@lolgab
Thank you for answering. Hope he can give a look. It is a really straightforward PR. And the CI passes.
Mark
@mmgalea_twitter

Hi there, I'm reading the documentation, https://monix.io/docs/3x/best-practices/blocking.html

implicit val ec = ExecutionContext
  .fromExecutor(Executors.newFixedThreadPool(1))

def addOne(x: Int) = Future(x + 1)

def multiply(x: Int, y: Int) = Future {
  val a = addOne(x)
  val b = addOne(y)
  val result = for (r1 <- a; r2 <- b) yield r1 * r2

  // This can dead-lock due to the limited size 
  // of our thread-pool!
  Await.result(result, Duration.Inf)
}

Not sure how this can dead-lock, can someone explain please.

Andy Polack
@ALPSMAC
@Avasil - thanks for confirming my intuition... 13 hours of work yesterday for me and I needed another pair of eyes to confirm... that gives me a place to start interrogating this morning. I appreciate the assist.
Yep - only one task is executed. Downstream from that I flatMap to an Observable that's attached to a Server Sent Event endpoint (converting via the Reactive Publisher spec. from Akka Streams Sink to a Monix Observable. I wonder if somehow the SSE is delaying the completion of the stream so that no further downstream elements are processed. I'll look into that today.
Loránd Szakács
@lorandszakacs
@mmgalea_twitter, the Future from multiply is already taking up one thread (i.e. all of your threads), it tries to start two new Futures with the calls to addOne, which never get to be executed, because they're spawned while your only thread is blocked. Then you proceed to completely block that single thread you have with the Await, never actually being able to execute the ones spawned by addOne
@mmgalea_twitter, meant a royal "you", not you specifically :)
Mark
@mmgalea_twitter
Gotcha, thanks man
But, wouldn't the addOne need to be awaited a priori in order to get the value r1?
so Await on a to get value r1, Await on b to get value r2, and finally Await on r1 * r2
Loránd Szakács
@lorandszakacs
@mmgalea_twitter, well that's what happens behind the scenes, because that's the semantics of composing Future in for comprehensions (i.e. flatMap). And from the look of it, the deadlock would happen even with only one single call to addOne because the moment you try to spawn a new Future(and schedule it on a new thread, the semantics of Future.apply), all your threads are already occupied. So there's nowhere to schedule the execution of addOne, and since the freeing up of threads is dependent on the termination of the Futures spawned by addOne calls, which can never finish, because you just blocked all your resources w/ Await.result
Andy Polack
@ALPSMAC
@mmgalea_twitter I could be wrong here but I believe the issue is in the technical implementation of Future as an eagerly evaluated structure. Each time a flatMap/map is triggered the work is automatically dispatched onto another thread - it forces a context switch, which requires a minimum of 2 threads in the pool to execute the work reliably. I think of it as something akin to Spark Streaming (which requires a minimum of 2 spark cores allocated in order to propagate elements downstream). Consequently, I think - although I could be mistaken - that if you instead replaced the Future with Monix Task or Cats-Effect IO here you could avoid the eager evaluation and effectively end up with the same implementation you described - "await" on a, "await" on b, "await" on r1 * r2, do it all on a single thread.
Loránd Szakács
@lorandszakacs

@mmgalea_twitter try running this:

object DeadlockedMain extends App {
  import java.util.concurrent.Executors
  import scala.concurrent._
  import scala.concurrent.duration.Duration

  implicit val ec = ExecutionContext
    .fromExecutor(Executors.newFixedThreadPool(1))

  def addOne(x: Int) = Future {
    println(s"addOne($x) — ${Thread.currentThread().getName}")
    x + 1
  }

  def multiply(x: Int, y: Int) = Future {
    println(s"multiply($x, $y) — ${Thread.currentThread().getName}")
    val a = addOne(x)
    val b = addOne(y)
    val result = for {
      r1 <- a
      r2 <- b
    } yield r1 * r2

    // This can dead-lock due to the limited size
    // of our thread-pool!
    Await.result(result, Duration.Inf)
  }

  multiply(2, 3)
}

I just added printlns w/ thread names, and you'll see that the Futures spawned by addOne never even get a chance to execute

Mark
@mmgalea_twitter
Removed the implicit and its clearer where the thread is required in order to run the future. Running your sample