Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Piotr Gawryś
@Avasil
You don't have to know it (I've never ever used rx), those are just extra resources
Anyway, I'm always happy to receive feedback so if you end up giving Monix Observable a try, I'll be glad to help with any confusing names/descriptions and possibly improve it in the process, one by one
Piotr Gawryś
@Avasil
@ALPSMAC It should log 1 time per emitted element so yes, it sounds like something is delaying it downstream. I assume only 1 task is executed?
Lorenzo Gabriele
@lolgab
Hi, don't know if it is the right channel, but I made a PR to update minitest to latest Scala Native (removing some no more necessary stuff like custom definitions of Future, Promise, Await and ExecutionContext). It would be nice a review of it and a release of the Scala Native 0.4.0-M2 version ( I added the split between the two versions using an environment variable (SCALANATIVE_VERSION="0.4.0-M2" sbt to use the new one)
Piotr Gawryś
@Avasil
cc @alexandru but he's not very active lately
Lorenzo Gabriele
@lolgab
Thank you for answering. Hope he can give a look. It is a really straightforward PR. And the CI passes.
Mark
@mmgalea_twitter

Hi there, I'm reading the documentation, https://monix.io/docs/3x/best-practices/blocking.html

implicit val ec = ExecutionContext
  .fromExecutor(Executors.newFixedThreadPool(1))

def addOne(x: Int) = Future(x + 1)

def multiply(x: Int, y: Int) = Future {
  val a = addOne(x)
  val b = addOne(y)
  val result = for (r1 <- a; r2 <- b) yield r1 * r2

  // This can dead-lock due to the limited size 
  // of our thread-pool!
  Await.result(result, Duration.Inf)
}

Not sure how this can dead-lock, can someone explain please.

Andy Polack
@ALPSMAC
@Avasil - thanks for confirming my intuition... 13 hours of work yesterday for me and I needed another pair of eyes to confirm... that gives me a place to start interrogating this morning. I appreciate the assist.
Yep - only one task is executed. Downstream from that I flatMap to an Observable that's attached to a Server Sent Event endpoint (converting via the Reactive Publisher spec. from Akka Streams Sink to a Monix Observable. I wonder if somehow the SSE is delaying the completion of the stream so that no further downstream elements are processed. I'll look into that today.
Loránd Szakács
@lorandszakacs
@mmgalea_twitter, the Future from multiply is already taking up one thread (i.e. all of your threads), it tries to start two new Futures with the calls to addOne, which never get to be executed, because they're spawned while your only thread is blocked. Then you proceed to completely block that single thread you have with the Await, never actually being able to execute the ones spawned by addOne
@mmgalea_twitter, meant a royal "you", not you specifically :)
Mark
@mmgalea_twitter
Gotcha, thanks man
But, wouldn't the addOne need to be awaited a priori in order to get the value r1?
so Await on a to get value r1, Await on b to get value r2, and finally Await on r1 * r2
Loránd Szakács
@lorandszakacs
@mmgalea_twitter, well that's what happens behind the scenes, because that's the semantics of composing Future in for comprehensions (i.e. flatMap). And from the look of it, the deadlock would happen even with only one single call to addOne because the moment you try to spawn a new Future(and schedule it on a new thread, the semantics of Future.apply), all your threads are already occupied. So there's nowhere to schedule the execution of addOne, and since the freeing up of threads is dependent on the termination of the Futures spawned by addOne calls, which can never finish, because you just blocked all your resources w/ Await.result
Andy Polack
@ALPSMAC
@mmgalea_twitter I could be wrong here but I believe the issue is in the technical implementation of Future as an eagerly evaluated structure. Each time a flatMap/map is triggered the work is automatically dispatched onto another thread - it forces a context switch, which requires a minimum of 2 threads in the pool to execute the work reliably. I think of it as something akin to Spark Streaming (which requires a minimum of 2 spark cores allocated in order to propagate elements downstream). Consequently, I think - although I could be mistaken - that if you instead replaced the Future with Monix Task or Cats-Effect IO here you could avoid the eager evaluation and effectively end up with the same implementation you described - "await" on a, "await" on b, "await" on r1 * r2, do it all on a single thread.
Loránd Szakács
@lorandszakacs

@mmgalea_twitter try running this:

object DeadlockedMain extends App {
  import java.util.concurrent.Executors
  import scala.concurrent._
  import scala.concurrent.duration.Duration

  implicit val ec = ExecutionContext
    .fromExecutor(Executors.newFixedThreadPool(1))

  def addOne(x: Int) = Future {
    println(s"addOne($x) — ${Thread.currentThread().getName}")
    x + 1
  }

  def multiply(x: Int, y: Int) = Future {
    println(s"multiply($x, $y) — ${Thread.currentThread().getName}")
    val a = addOne(x)
    val b = addOne(y)
    val result = for {
      r1 <- a
      r2 <- b
    } yield r1 * r2

    // This can dead-lock due to the limited size
    // of our thread-pool!
    Await.result(result, Duration.Inf)
  }

  multiply(2, 3)
}

I just added printlns w/ thread names, and you'll see that the Futures spawned by addOne never even get a chance to execute

Mark
@mmgalea_twitter
Removed the implicit and its clearer where the thread is required in order to run the future. Running your sample
I understood what you meant with multiple(2, 3). Thanks
object FixedMain extends App {
  import java.util.concurrent.Executors
  import scala.concurrent._
  import scala.concurrent.duration.Duration

  implicit val ec = ExecutionContext
    .fromExecutor(Executors.newFixedThreadPool(2))

  def addOne(x: Int) = Future {
    println(s"addOne($x) — ${Thread.currentThread().getName}")
    x + 1
  }

  def multiply(x: Int, y: Int) = Future {
    println(s"multiply($x, $y) — ${Thread.currentThread().getName}")
    val a = addOne(x)
    val b = addOne(y)
    val result = for {
      r1 <- a
      r2 <- b
    } yield r1 * r2

    // This can dead-lock due to the limited size
    // of our thread-pool!
    Await.result(result, Duration.Inf)
  }

  multiply(2, 3)
}
This is quite informative, seeing which threads are used.
Loránd Szakács
@lorandszakacs
Here's another sample w/ two ECS (that works), and now you can see different thread names, and how the Futures were executed
object DeadlockedMain extends App {
  import java.util.concurrent.Executors
  import scala.concurrent._
  import scala.concurrent.duration.Duration

  val ecBlocking = ExecutionContext
    .fromExecutor(Executors.newFixedThreadPool(1))

  val ecCPU = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(2))

  def addOne(x: Int) =
    Future {
      println(s"addOne($x) — ${Thread.currentThread().getName}")
      x + 1
    }(ecCPU)

  def multiply(x: Int, y: Int) =
    Future {
      println(s"multiply($x, $y) — ${Thread.currentThread().getName}")
      val a = addOne(x)
      val b = addOne(y)

      implicit val neededForFlatMap = ecCPU

      val result = for {
        r1 <- a
        r2 <- b
      } yield r1 * r2

      // This can dead-lock due to the limited size
      // of our thread-pool!
      Await.result(result, Duration.Inf)
    }(ecBlocking)

  multiply(2, 3)
}
Mark
@mmgalea_twitter
Thanks @lorandszakacs
Piotr Gawryś
@Avasil
@ALPSMAC if you flatMap to stream that is using SSE it won't process next element until it is complete (it's how flatMap works for streams in general). If SSE takes very long or forever, you can consider running few of them in parallel
  Observable
    .fromIterable(0 to 10)
    .flatMap(i => Observable.evalDelayed(1.second, println(i)))
    .completedL
    .runSyncUnsafe() // will print 1 per second
  Observable
    .fromIterable(0 to 10)
    .mergeMap(i => Observable.evalDelayed(1.second, println(i)))
    .completedL
    .runSyncUnsafe() // will print everything after 1 second
Not sure if that's your issue but I'm happy to help further if necessary :D
Andy Polack
@ALPSMAC
Thanks @Avasil - I think it may be mergeMap I was looking for. SSE does take forever - basically subscribing to an event stream until a particular event is observed, then client should make the choice to disconnect. Monitoring multiple SSE endpoints for events like this - trying to do it with Observables and having trouble expressing:
a. monitoring all of them in parallel (mergeMap seems to be the proper combinator for this... thank you!!)
b. terminating all of the separate streams once I see the event I'm looking for... currently using a takeWhile approach on the Observable wrapping the SSEs, but due to the concurrent merge the stream ends when the first SSE sees the event I'm looking for rather then when ALL SSEs have seen the given event... (I see a forall combinator, but I'm not 100% sure how to make use of it)
Piotr Gawryś
@Avasil
Do you know how many SSE streams do you have?
Andy Polack
@ALPSMAC
Nope - determined at runtime
Monitoring a bunch of asynchronous processes that are tracking status of long-running tasks kicked off based on an out-of-band REST request.
I did look at using something like the zipMap operation... but found I ran into the problem where I don't know the number of SSEs until runtime.
Piotr Gawryś
@Avasil
There is also zipList variant if you have a list. Does the number of SSE changes dynamically or you know at some point (in runtime) that it's n SSE requests to run until all of them emit given element? In case of the latter it should be relatively easy to solve with semaphore
Oleg Pyzhcov
@oleg-py
@ALPSMAC why not then use takeWhile on each individual stream?
Piotr Gawryś
@Avasil
If my understanding is correct, he wants other streams to keep going until all of them emit specific item
Oleg Pyzhcov
@oleg-py
yeah, so you set the stop with takeWhile on each one and mergeMap them into a single stream.
Piotr Gawryś
@Avasil
Sorry, I meant all the streams to keep going (including those that emitted this item)
Andy Polack
@ALPSMAC
@Avasil and @oleg-py thanks for the feedback. Let me get a bit of pseudocode together to describe what I'm stumbling on in more detail.
Andy Polack
@ALPSMAC
val observable = Observable.fromIterable(requestIds).mergeMap{ ids => 
  Observable.fromReactivePublisher(SSEPublisher).takeWhileInclusive{ status => status != TaskStatus.DONE }
}.filter{ status => status == TaskStatus.DONE }

val consumer = Consumer.foreachTask[TaskStatus]{ status => persistStatusToDatabase(status) }

val task = observable.consumeWith(consumer)

task.runToFuture

Expected Behavior: Each SSE client is disconnected once it observes the first 'DONE' status message on its channel. The consumer sees all DONE messages across all channels and persists each to the database. (presuming here in this pseudocode that the status object has some sense of its "channel" it is carrying along with it.

Actual Behavior: The first SSE channel to observe a DONE message wins, is committed to the database, and the stream appears to terminate.

I suspect I'm missing something fundamental here, but I figured since I was creating a bunch of Observables and mergeMap-ing them together, each with their own condition for termination (the takeWhileInclusive bit) that the end result would just be a stream of statuses from all channels, with the final message from each channel on the stream being the DONE event.

Oleg Pyzhcov
@oleg-py
Yeah, that's how I'd expect it to be. Could be a bug similar to one I fixed in #918.
Piotr Gawryś
@Avasil
And that's how it works in toy examples, e.g.
implicit val s = Scheduler.global

val observable = Observable.range(1, 6)
  .mergeMap { _ =>
    Observable.range(0, 5).takeWhileInclusive{ _ != 3 }
  }
  .filter{ _ == 3 }

val consumer = Consumer.foreachTask[Long]{ status => Task(println(s"persist $status")) }

val task = observable.consumeWith(consumer)

task.runSyncUnsafe()
Oleg Pyzhcov
@oleg-py
What if you throttle that inner observable?
and make observables terminate at different intervals?
Andy Polack
@ALPSMAC
Yeah - let me give that a shot and see what happens.
Oleg Pyzhcov
@oleg-py
I'm looking for ways to break it, not fix it :D
Piotr Gawryś
@Avasil
.mergeMap { i =>
    Observable.range(0, 5).delayOnNext(100 * i.millis).takeWhileInclusive{ _ != 3 }
  }
still works unfortunately
Andy Polack
@ALPSMAC
for what it's worth I'm using an unbounded cached thread pool to back the Scheduler I'm using to dispatch the work on.
Andy Polack
@ALPSMAC
Ah - actually I think maybe I'm onto something.... I think persistStatusToDatabase(status) might be silently throwing an exception and killing the stream... not sure the exception isn't being reported when I use observable.doOnError{...} but maybe it's because the exception is being thrown within the Consumer.
Yep - that's the cause! Thanks for the troubleshooting assist - greatly appreciate it!
Also - that mergeMap is exactly what I was looking for and for some reason absolutely couldn't find it yesterday - so thanks for that hint too!
Piotr Gawryś
@Avasil
Awesome, glad to hear you managed to resolve your issue :D
Andy Polack
@ALPSMAC
Sincerely appreciate the prompt assistance - thanks @Avasil and @oleg-py !
Geovanny Junio
@geovannyjs

Hi, I am a newcomer in Monix and I am trying to do something like this:

lazy val io = Scheduler.io(name="my-io")

val source = Task { 
  //..blocking IO here, actually waiting for the user fingerprint, may wait for hours...
}

// execute in another logical thread
val task: CancelableFuture[Unit] = source.executeOn(io).asyncBoundary.runToFuture

// I changed my mind and don't want to wait
// so I am trying to cancel
task.cancel()  // <- IT IS NOT WORKING

Basically I have a block IO operation and I am trying to execute in another logical thread, but if I try to cancel it, nothing happens.