Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
ruslan
@unoexperto
Guys, what is analogue of akka's Source.unfold in Observable ? fromStateAction looks close but there is no stopping condition (although I can stop using takeWhile I guess)
ruslan
@unoexperto
And how do I throttle items coming from Observable ? I tried debounce, throttleLast, throttleFirst and sample but they finish stream prematurely.
Piotr Gawryś
@Avasil

Guys, what is analogue of akka's Source.unfold in Observable ? fromStateAction looks close but there is no stopping condition (although I can stop using takeWhile I guess)

That seems the way to do it. Having Observable.unfold might be still useful since it's quite known builder so I created an issue for it

And how do I throttle items coming from Observable ? I tried debounce, throttleLast, throttleFirst and sample but they finish stream prematurely.

Doesn't sound right, what's your Observable?

ruslan
@unoexperto
@Avasil Observable.fromIterable(0 to 10).sample(1 second). I run it on same thread executor though.
Piotr Gawryś
@Avasil
That's expected, your elements are emitted right away and sample will choose the latest in given time
You can try
Observable
    .fromIterable(0 to 10)
    .delayOnNext(200.millis)
    .sample(1.second)
What's your use case?
Andy Polack
@ALPSMAC

So I'm a little stuck on the usage of Observable. Is there any state that we should see something like this:

val l = List(1,2)
println(l.length)
val observable = Observable.fromIterable(l).flatMap{ i => 
  println("Inside FlatMap")
  Observable(i + 1) 
}
val task = observable.consumeWith(Consumer.foreach(_ => ()))
task.runToFuture

produce something other than something like this output?:

2
Inside FlatMap
Inside FlatMap
ruslan
@unoexperto
@Avasil I want to throttle emitting items by X items per Y time.
Andy Polack
@ALPSMAC

Currently I have:

      logger.debug("Keys Length: "+keys.length)
      Observable.fromIterable(keys).flatMap{ task =>
        logger.debug("AAAA")
        Observable.fromTask(task)
      }

where keys is of type List[Task[T]]. I see that keys.length is 2, but I only see "AAAA" printed once.

I have a feeling something perhaps further downstream is stalling processing... but I'm not quite sure how to verify that.

Piotr Gawryś
@Avasil

@unoexperto I think you're looking for

Observable
  .fromIterable(0 to 10)
  .bufferTimedWithPressure(1.second, 2)
  .flatMap(Observable.fromIterable)

How does it look like in Akka Streams?

Oh, it's just called throttle
ruslan
@unoexperto
@Avasil Thanks! In akka it's .throttle(number_of_items, FiniteDuration).
@Avasil It's my second attempt to use monix and I suspect it will be the last. Not sure why but monix requires a lot of mental effort even for basic stuff - names of functions are weird, behavior is weird, documentation is scarce. Just honest feedback, guys.
@Avasil To figure out throttling I tried to find in documentation first, then googled, then looked at tests, then came here. 2 hrs for obvious functionality.
On the other hand cats-effect (where author of monix contributes) is absolutely intuitive to me.
Piotr Gawryś
@Avasil
That's fair, I'l add this one to the documentation
Most of the names come from ReactiveX though
ruslan
@unoexperto
@Avasil Frankly I don't think it will help in long run with adoption. I think the best thing would be to have one-to-one mapping in documentation to every function of akka's Flow, Source and Sink.
toxicafunk
@toxicafunk
I find reactive X madness more useful though, since I was doing some in Java
Piotr Gawryś
@Avasil
yeah, that's arguable because it depends on someone's background
ruslan
@unoexperto
@toxicafunk Agreed. Monix might be friendly to those who came from vanilla Java.
Piotr Gawryś
@Avasil
e.g. meaning of throttle is different than the one in Akka Streams
ruslan
@unoexperto
If goal of monix author is to get Java's market then fine.
@Avasil how about names like debounce, throttleLast, throttleFirst and sample? :) It's hard to say from the name what these functions really do.
Piotr Gawryś
@Avasil
I agree but in case of Observable which is heavily inspired by ReactiveX (very similar model but with more FP flavor) there is a lot of value in being consistent. Huge part of the API that could have confusing docs will most likely be well documented elsewhere, e.g. http://reactivex.io/documentation/operators/sample.html
I also don't have idea for better names for many of these :D
ruslan
@unoexperto
@Avasil Good point on being inspired by ReactiveX. Although newcomers to monix may not figure out that they need to go elsewhere to understand how to use monix.
Piotr Gawryś
@Avasil
It's a good idea to point that somewhere at the top of docs
You don't have to know it (I've never ever used rx), those are just extra resources
Anyway, I'm always happy to receive feedback so if you end up giving Monix Observable a try, I'll be glad to help with any confusing names/descriptions and possibly improve it in the process, one by one
Piotr Gawryś
@Avasil
@ALPSMAC It should log 1 time per emitted element so yes, it sounds like something is delaying it downstream. I assume only 1 task is executed?
Lorenzo Gabriele
@lolgab
Hi, don't know if it is the right channel, but I made a PR to update minitest to latest Scala Native (removing some no more necessary stuff like custom definitions of Future, Promise, Await and ExecutionContext). It would be nice a review of it and a release of the Scala Native 0.4.0-M2 version ( I added the split between the two versions using an environment variable (SCALANATIVE_VERSION="0.4.0-M2" sbt to use the new one)
Piotr Gawryś
@Avasil
cc @alexandru but he's not very active lately
Lorenzo Gabriele
@lolgab
Thank you for answering. Hope he can give a look. It is a really straightforward PR. And the CI passes.
Mark
@mmgalea_twitter

Hi there, I'm reading the documentation, https://monix.io/docs/3x/best-practices/blocking.html

implicit val ec = ExecutionContext
  .fromExecutor(Executors.newFixedThreadPool(1))

def addOne(x: Int) = Future(x + 1)

def multiply(x: Int, y: Int) = Future {
  val a = addOne(x)
  val b = addOne(y)
  val result = for (r1 <- a; r2 <- b) yield r1 * r2

  // This can dead-lock due to the limited size 
  // of our thread-pool!
  Await.result(result, Duration.Inf)
}

Not sure how this can dead-lock, can someone explain please.

Andy Polack
@ALPSMAC
@Avasil - thanks for confirming my intuition... 13 hours of work yesterday for me and I needed another pair of eyes to confirm... that gives me a place to start interrogating this morning. I appreciate the assist.
Yep - only one task is executed. Downstream from that I flatMap to an Observable that's attached to a Server Sent Event endpoint (converting via the Reactive Publisher spec. from Akka Streams Sink to a Monix Observable. I wonder if somehow the SSE is delaying the completion of the stream so that no further downstream elements are processed. I'll look into that today.
Loránd Szakács
@lorandszakacs
@mmgalea_twitter, the Future from multiply is already taking up one thread (i.e. all of your threads), it tries to start two new Futures with the calls to addOne, which never get to be executed, because they're spawned while your only thread is blocked. Then you proceed to completely block that single thread you have with the Await, never actually being able to execute the ones spawned by addOne
@mmgalea_twitter, meant a royal "you", not you specifically :)
Mark
@mmgalea_twitter
Gotcha, thanks man
But, wouldn't the addOne need to be awaited a priori in order to get the value r1?
so Await on a to get value r1, Await on b to get value r2, and finally Await on r1 * r2
Loránd Szakács
@lorandszakacs
@mmgalea_twitter, well that's what happens behind the scenes, because that's the semantics of composing Future in for comprehensions (i.e. flatMap). And from the look of it, the deadlock would happen even with only one single call to addOne because the moment you try to spawn a new Future(and schedule it on a new thread, the semantics of Future.apply), all your threads are already occupied. So there's nowhere to schedule the execution of addOne, and since the freeing up of threads is dependent on the termination of the Futures spawned by addOne calls, which can never finish, because you just blocked all your resources w/ Await.result
Andy Polack
@ALPSMAC
@mmgalea_twitter I could be wrong here but I believe the issue is in the technical implementation of Future as an eagerly evaluated structure. Each time a flatMap/map is triggered the work is automatically dispatched onto another thread - it forces a context switch, which requires a minimum of 2 threads in the pool to execute the work reliably. I think of it as something akin to Spark Streaming (which requires a minimum of 2 spark cores allocated in order to propagate elements downstream). Consequently, I think - although I could be mistaken - that if you instead replaced the Future with Monix Task or Cats-Effect IO here you could avoid the eager evaluation and effectively end up with the same implementation you described - "await" on a, "await" on b, "await" on r1 * r2, do it all on a single thread.
Loránd Szakács
@lorandszakacs

@mmgalea_twitter try running this:

object DeadlockedMain extends App {
  import java.util.concurrent.Executors
  import scala.concurrent._
  import scala.concurrent.duration.Duration

  implicit val ec = ExecutionContext
    .fromExecutor(Executors.newFixedThreadPool(1))

  def addOne(x: Int) = Future {
    println(s"addOne($x) — ${Thread.currentThread().getName}")
    x + 1
  }

  def multiply(x: Int, y: Int) = Future {
    println(s"multiply($x, $y) — ${Thread.currentThread().getName}")
    val a = addOne(x)
    val b = addOne(y)
    val result = for {
      r1 <- a
      r2 <- b
    } yield r1 * r2

    // This can dead-lock due to the limited size
    // of our thread-pool!
    Await.result(result, Duration.Inf)
  }

  multiply(2, 3)
}

I just added printlns w/ thread names, and you'll see that the Futures spawned by addOne never even get a chance to execute

Mark
@mmgalea_twitter
Removed the implicit and its clearer where the thread is required in order to run the future. Running your sample
I understood what you meant with multiple(2, 3). Thanks
object FixedMain extends App {
  import java.util.concurrent.Executors
  import scala.concurrent._
  import scala.concurrent.duration.Duration

  implicit val ec = ExecutionContext
    .fromExecutor(Executors.newFixedThreadPool(2))

  def addOne(x: Int) = Future {
    println(s"addOne($x) — ${Thread.currentThread().getName}")
    x + 1
  }

  def multiply(x: Int, y: Int) = Future {
    println(s"multiply($x, $y) — ${Thread.currentThread().getName}")
    val a = addOne(x)
    val b = addOne(y)
    val result = for {
      r1 <- a
      r2 <- b
    } yield r1 * r2

    // This can dead-lock due to the limited size
    // of our thread-pool!
    Await.result(result, Duration.Inf)
  }

  multiply(2, 3)
}
This is quite informative, seeing which threads are used.
Loránd Szakács
@lorandszakacs
Here's another sample w/ two ECS (that works), and now you can see different thread names, and how the Futures were executed
object DeadlockedMain extends App {
  import java.util.concurrent.Executors
  import scala.concurrent._
  import scala.concurrent.duration.Duration

  val ecBlocking = ExecutionContext
    .fromExecutor(Executors.newFixedThreadPool(1))

  val ecCPU = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(2))

  def addOne(x: Int) =
    Future {
      println(s"addOne($x) — ${Thread.currentThread().getName}")
      x + 1
    }(ecCPU)

  def multiply(x: Int, y: Int) =
    Future {
      println(s"multiply($x, $y) — ${Thread.currentThread().getName}")
      val a = addOne(x)
      val b = addOne(y)

      implicit val neededForFlatMap = ecCPU

      val result = for {
        r1 <- a
        r2 <- b
      } yield r1 * r2

      // This can dead-lock due to the limited size
      // of our thread-pool!
      Await.result(result, Duration.Inf)
    }(ecBlocking)

  multiply(2, 3)
}
Mark
@mmgalea_twitter
Thanks @lorandszakacs
Piotr Gawryś
@Avasil
@ALPSMAC if you flatMap to stream that is using SSE it won't process next element until it is complete (it's how flatMap works for streams in general). If SSE takes very long or forever, you can consider running few of them in parallel
  Observable
    .fromIterable(0 to 10)
    .flatMap(i => Observable.evalDelayed(1.second, println(i)))
    .completedL
    .runSyncUnsafe() // will print 1 per second
  Observable
    .fromIterable(0 to 10)
    .mergeMap(i => Observable.evalDelayed(1.second, println(i)))
    .completedL
    .runSyncUnsafe() // will print everything after 1 second