Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Mark
@mmgalea_twitter
Gotcha, thanks man
But, wouldn't the addOne need to be awaited a priori in order to get the value r1?
so Await on a to get value r1, Await on b to get value r2, and finally Await on r1 * r2
Loránd Szakács
@lorandszakacs
@mmgalea_twitter, well that's what happens behind the scenes, because that's the semantics of composing Future in for comprehensions (i.e. flatMap). And from the look of it, the deadlock would happen even with only one single call to addOne because the moment you try to spawn a new Future(and schedule it on a new thread, the semantics of Future.apply), all your threads are already occupied. So there's nowhere to schedule the execution of addOne, and since the freeing up of threads is dependent on the termination of the Futures spawned by addOne calls, which can never finish, because you just blocked all your resources w/ Await.result
Andy Polack
@ALPSMAC
@mmgalea_twitter I could be wrong here but I believe the issue is in the technical implementation of Future as an eagerly evaluated structure. Each time a flatMap/map is triggered the work is automatically dispatched onto another thread - it forces a context switch, which requires a minimum of 2 threads in the pool to execute the work reliably. I think of it as something akin to Spark Streaming (which requires a minimum of 2 spark cores allocated in order to propagate elements downstream). Consequently, I think - although I could be mistaken - that if you instead replaced the Future with Monix Task or Cats-Effect IO here you could avoid the eager evaluation and effectively end up with the same implementation you described - "await" on a, "await" on b, "await" on r1 * r2, do it all on a single thread.
Loránd Szakács
@lorandszakacs

@mmgalea_twitter try running this:

object DeadlockedMain extends App {
  import java.util.concurrent.Executors
  import scala.concurrent._
  import scala.concurrent.duration.Duration

  implicit val ec = ExecutionContext
    .fromExecutor(Executors.newFixedThreadPool(1))

  def addOne(x: Int) = Future {
    println(s"addOne($x) — ${Thread.currentThread().getName}")
    x + 1
  }

  def multiply(x: Int, y: Int) = Future {
    println(s"multiply($x, $y) — ${Thread.currentThread().getName}")
    val a = addOne(x)
    val b = addOne(y)
    val result = for {
      r1 <- a
      r2 <- b
    } yield r1 * r2

    // This can dead-lock due to the limited size
    // of our thread-pool!
    Await.result(result, Duration.Inf)
  }

  multiply(2, 3)
}

I just added printlns w/ thread names, and you'll see that the Futures spawned by addOne never even get a chance to execute

Mark
@mmgalea_twitter
Removed the implicit and its clearer where the thread is required in order to run the future. Running your sample
I understood what you meant with multiple(2, 3). Thanks
object FixedMain extends App {
  import java.util.concurrent.Executors
  import scala.concurrent._
  import scala.concurrent.duration.Duration

  implicit val ec = ExecutionContext
    .fromExecutor(Executors.newFixedThreadPool(2))

  def addOne(x: Int) = Future {
    println(s"addOne($x) — ${Thread.currentThread().getName}")
    x + 1
  }

  def multiply(x: Int, y: Int) = Future {
    println(s"multiply($x, $y) — ${Thread.currentThread().getName}")
    val a = addOne(x)
    val b = addOne(y)
    val result = for {
      r1 <- a
      r2 <- b
    } yield r1 * r2

    // This can dead-lock due to the limited size
    // of our thread-pool!
    Await.result(result, Duration.Inf)
  }

  multiply(2, 3)
}
This is quite informative, seeing which threads are used.
Loránd Szakács
@lorandszakacs
Here's another sample w/ two ECS (that works), and now you can see different thread names, and how the Futures were executed
object DeadlockedMain extends App {
  import java.util.concurrent.Executors
  import scala.concurrent._
  import scala.concurrent.duration.Duration

  val ecBlocking = ExecutionContext
    .fromExecutor(Executors.newFixedThreadPool(1))

  val ecCPU = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(2))

  def addOne(x: Int) =
    Future {
      println(s"addOne($x) — ${Thread.currentThread().getName}")
      x + 1
    }(ecCPU)

  def multiply(x: Int, y: Int) =
    Future {
      println(s"multiply($x, $y) — ${Thread.currentThread().getName}")
      val a = addOne(x)
      val b = addOne(y)

      implicit val neededForFlatMap = ecCPU

      val result = for {
        r1 <- a
        r2 <- b
      } yield r1 * r2

      // This can dead-lock due to the limited size
      // of our thread-pool!
      Await.result(result, Duration.Inf)
    }(ecBlocking)

  multiply(2, 3)
}
Mark
@mmgalea_twitter
Thanks @lorandszakacs
Piotr Gawryś
@Avasil
@ALPSMAC if you flatMap to stream that is using SSE it won't process next element until it is complete (it's how flatMap works for streams in general). If SSE takes very long or forever, you can consider running few of them in parallel
  Observable
    .fromIterable(0 to 10)
    .flatMap(i => Observable.evalDelayed(1.second, println(i)))
    .completedL
    .runSyncUnsafe() // will print 1 per second
  Observable
    .fromIterable(0 to 10)
    .mergeMap(i => Observable.evalDelayed(1.second, println(i)))
    .completedL
    .runSyncUnsafe() // will print everything after 1 second
Not sure if that's your issue but I'm happy to help further if necessary :D
Andy Polack
@ALPSMAC
Thanks @Avasil - I think it may be mergeMap I was looking for. SSE does take forever - basically subscribing to an event stream until a particular event is observed, then client should make the choice to disconnect. Monitoring multiple SSE endpoints for events like this - trying to do it with Observables and having trouble expressing:
a. monitoring all of them in parallel (mergeMap seems to be the proper combinator for this... thank you!!)
b. terminating all of the separate streams once I see the event I'm looking for... currently using a takeWhile approach on the Observable wrapping the SSEs, but due to the concurrent merge the stream ends when the first SSE sees the event I'm looking for rather then when ALL SSEs have seen the given event... (I see a forall combinator, but I'm not 100% sure how to make use of it)
Piotr Gawryś
@Avasil
Do you know how many SSE streams do you have?
Andy Polack
@ALPSMAC
Nope - determined at runtime
Monitoring a bunch of asynchronous processes that are tracking status of long-running tasks kicked off based on an out-of-band REST request.
I did look at using something like the zipMap operation... but found I ran into the problem where I don't know the number of SSEs until runtime.
Piotr Gawryś
@Avasil
There is also zipList variant if you have a list. Does the number of SSE changes dynamically or you know at some point (in runtime) that it's n SSE requests to run until all of them emit given element? In case of the latter it should be relatively easy to solve with semaphore
Oleg Pyzhcov
@oleg-py
@ALPSMAC why not then use takeWhile on each individual stream?
Piotr Gawryś
@Avasil
If my understanding is correct, he wants other streams to keep going until all of them emit specific item
Oleg Pyzhcov
@oleg-py
yeah, so you set the stop with takeWhile on each one and mergeMap them into a single stream.
Piotr Gawryś
@Avasil
Sorry, I meant all the streams to keep going (including those that emitted this item)
Andy Polack
@ALPSMAC
@Avasil and @oleg-py thanks for the feedback. Let me get a bit of pseudocode together to describe what I'm stumbling on in more detail.
Andy Polack
@ALPSMAC
val observable = Observable.fromIterable(requestIds).mergeMap{ ids => 
  Observable.fromReactivePublisher(SSEPublisher).takeWhileInclusive{ status => status != TaskStatus.DONE }
}.filter{ status => status == TaskStatus.DONE }

val consumer = Consumer.foreachTask[TaskStatus]{ status => persistStatusToDatabase(status) }

val task = observable.consumeWith(consumer)

task.runToFuture

Expected Behavior: Each SSE client is disconnected once it observes the first 'DONE' status message on its channel. The consumer sees all DONE messages across all channels and persists each to the database. (presuming here in this pseudocode that the status object has some sense of its "channel" it is carrying along with it.

Actual Behavior: The first SSE channel to observe a DONE message wins, is committed to the database, and the stream appears to terminate.

I suspect I'm missing something fundamental here, but I figured since I was creating a bunch of Observables and mergeMap-ing them together, each with their own condition for termination (the takeWhileInclusive bit) that the end result would just be a stream of statuses from all channels, with the final message from each channel on the stream being the DONE event.

Oleg Pyzhcov
@oleg-py
Yeah, that's how I'd expect it to be. Could be a bug similar to one I fixed in #918.
Piotr Gawryś
@Avasil
And that's how it works in toy examples, e.g.
implicit val s = Scheduler.global

val observable = Observable.range(1, 6)
  .mergeMap { _ =>
    Observable.range(0, 5).takeWhileInclusive{ _ != 3 }
  }
  .filter{ _ == 3 }

val consumer = Consumer.foreachTask[Long]{ status => Task(println(s"persist $status")) }

val task = observable.consumeWith(consumer)

task.runSyncUnsafe()
Oleg Pyzhcov
@oleg-py
What if you throttle that inner observable?
and make observables terminate at different intervals?
Andy Polack
@ALPSMAC
Yeah - let me give that a shot and see what happens.
Oleg Pyzhcov
@oleg-py
I'm looking for ways to break it, not fix it :D
Piotr Gawryś
@Avasil
.mergeMap { i =>
    Observable.range(0, 5).delayOnNext(100 * i.millis).takeWhileInclusive{ _ != 3 }
  }
still works unfortunately
Andy Polack
@ALPSMAC
for what it's worth I'm using an unbounded cached thread pool to back the Scheduler I'm using to dispatch the work on.
Andy Polack
@ALPSMAC
Ah - actually I think maybe I'm onto something.... I think persistStatusToDatabase(status) might be silently throwing an exception and killing the stream... not sure the exception isn't being reported when I use observable.doOnError{...} but maybe it's because the exception is being thrown within the Consumer.
Yep - that's the cause! Thanks for the troubleshooting assist - greatly appreciate it!
Also - that mergeMap is exactly what I was looking for and for some reason absolutely couldn't find it yesterday - so thanks for that hint too!
Piotr Gawryś
@Avasil
Awesome, glad to hear you managed to resolve your issue :D
Andy Polack
@ALPSMAC
Sincerely appreciate the prompt assistance - thanks @Avasil and @oleg-py !
Geovanny Junio
@geovannyjs

Hi, I am a newcomer in Monix and I am trying to do something like this:

lazy val io = Scheduler.io(name="my-io")

val source = Task { 
  //..blocking IO here, actually waiting for the user fingerprint, may wait for hours...
}

// execute in another logical thread
val task: CancelableFuture[Unit] = source.executeOn(io).asyncBoundary.runToFuture

// I changed my mind and don't want to wait
// so I am trying to cancel
task.cancel()  // <- IT IS NOT WORKING

Basically I have a block IO operation and I am trying to execute in another logical thread, but if I try to cancel it, nothing happens.

Geovanny Junio
@geovannyjs

I just made a test here, and it works if I change to:

lazy val io = Scheduler.io(name="my-io")

val source = Task { 
  Thread.sleep(99999)
}

val task: CancelableFuture[Unit] = source.executeOn(io).asyncBoundary.runToFuture

task.cancel()

But when I put inside the Task the comand to capture the fingerprint, it does not work:

val source = Task { 
  import scala.sys.process._
  "/path/to/my/fingerprint/capture".!!  // it is just a cli command that waits for the fingerprint and save it in an jpeg file
}

Some light do be shed?

Geovanny Junio
@geovannyjs
Even a shell command cannot be cancelled:
val source = Task { 
  import scala.sys.process._
  "sleep 99999".!! 
}
Ryan Peters
@sloshy
@geovannyjs IIRC - someone please correct me if I'm wrong - Monix implements cancellation in a similar way to cats-effect in that there has to be a certain "boundary" for cancellation to happen. Every time this boundary is reached, the task checks if it has been cancelled, and if so, stops executing. If you run a shell command, you cannot cancel it because there is no point where you are checking the boundary. Likewise if you call Thread.sleep you are blocking the thread, and even if there was a boundary for it to check if it was cancelled, it will never reach it.
Monix has auto-cancellation on map/flatMap calls as an option
This blog post might be a bit outdated, but it lays out the difference in how they work: https://monix.io/blog/2018/03/20/monix-vs-cats-effect.html
Ryan Peters
@sloshy
For sleeping - use Task.sleep instead, as it is non-blocking compared to Thread.sleep
Ryan Peters
@sloshy
If you're running a shell command you might want to consider either making it a background process that you can handle independently or using some kind of timeout to insert your own boundary to cancel on before retrying the command.
Geovanny Junio
@geovannyjs

If you're running a shell command you might want to consider either making it a background process that you can handle independently or using some kind of timeout to insert your own boundary to cancel on before retrying the command.

Thank you @sloshy :thumbsup: I will look for some docs of how to make it a background process and handle it independently

Piotr Gawryś
@Avasil
Keep in mind that even calling shellProcess.timeout(5.second) won't actually stop the process if it doesn't have any cancelation capability (it's not possible to cancel arbitrary synchronous actions). I've never used shell commands from Scala so can't give you any details but in your case it might be possible to send CTRL + C there on cancelation
Geovanny Junio
@geovannyjs
@Avasil thank you :thumbsup:
Geovanny Junio
@geovannyjs

I was trying to use Monix.Task to run the command in background, but I found out another construction of scala.sys.process that already runs the process in the background and has a destroy method:

import scala.sys.process._
val cmd = "/my/shell/command".run()
cmd.destroy()

Anyway it was a good opportunity to be introduced to Monix, until yesterday it was unknow to me. Thank you guys @sloshy @Avasil I really appreciate your help :thumbsup: