Future
as an eagerly evaluated structure. Each time a flatMap
/map
is triggered the work is automatically dispatched onto another thread - it forces a context switch, which requires a minimum of 2 threads in the pool to execute the work reliably. I think of it as something akin to Spark Streaming (which requires a minimum of 2 spark cores allocated in order to propagate elements downstream). Consequently, I think - although I could be mistaken - that if you instead replaced the Future
with Monix Task
or Cats-Effect IO
here you could avoid the eager evaluation and effectively end up with the same implementation you described - "await" on a, "await" on b, "await" on r1 * r2, do it all on a single thread.
@mmgalea_twitter try running this:
object DeadlockedMain extends App {
import java.util.concurrent.Executors
import scala.concurrent._
import scala.concurrent.duration.Duration
implicit val ec = ExecutionContext
.fromExecutor(Executors.newFixedThreadPool(1))
def addOne(x: Int) = Future {
println(s"addOne($x) — ${Thread.currentThread().getName}")
x + 1
}
def multiply(x: Int, y: Int) = Future {
println(s"multiply($x, $y) — ${Thread.currentThread().getName}")
val a = addOne(x)
val b = addOne(y)
val result = for {
r1 <- a
r2 <- b
} yield r1 * r2
// This can dead-lock due to the limited size
// of our thread-pool!
Await.result(result, Duration.Inf)
}
multiply(2, 3)
}
I just added printlns w/ thread names, and you'll see that the Futures spawned by addOne
never even get a chance to execute
multiple(2, 3)
. Thanks
object FixedMain extends App {
import java.util.concurrent.Executors
import scala.concurrent._
import scala.concurrent.duration.Duration
implicit val ec = ExecutionContext
.fromExecutor(Executors.newFixedThreadPool(2))
def addOne(x: Int) = Future {
println(s"addOne($x) — ${Thread.currentThread().getName}")
x + 1
}
def multiply(x: Int, y: Int) = Future {
println(s"multiply($x, $y) — ${Thread.currentThread().getName}")
val a = addOne(x)
val b = addOne(y)
val result = for {
r1 <- a
r2 <- b
} yield r1 * r2
// This can dead-lock due to the limited size
// of our thread-pool!
Await.result(result, Duration.Inf)
}
multiply(2, 3)
}
This is quite informative, seeing which threads are used.
object DeadlockedMain extends App {
import java.util.concurrent.Executors
import scala.concurrent._
import scala.concurrent.duration.Duration
val ecBlocking = ExecutionContext
.fromExecutor(Executors.newFixedThreadPool(1))
val ecCPU = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(2))
def addOne(x: Int) =
Future {
println(s"addOne($x) — ${Thread.currentThread().getName}")
x + 1
}(ecCPU)
def multiply(x: Int, y: Int) =
Future {
println(s"multiply($x, $y) — ${Thread.currentThread().getName}")
val a = addOne(x)
val b = addOne(y)
implicit val neededForFlatMap = ecCPU
val result = for {
r1 <- a
r2 <- b
} yield r1 * r2
// This can dead-lock due to the limited size
// of our thread-pool!
Await.result(result, Duration.Inf)
}(ecBlocking)
multiply(2, 3)
}
flatMap
to stream that is using SSE it won't process next element until it is complete (it's how flatMap
works for streams in general). If SSE
takes very long or forever, you can consider running few of them in parallel Observable
.fromIterable(0 to 10)
.flatMap(i => Observable.evalDelayed(1.second, println(i)))
.completedL
.runSyncUnsafe() // will print 1 per second
Observable
.fromIterable(0 to 10)
.mergeMap(i => Observable.evalDelayed(1.second, println(i)))
.completedL
.runSyncUnsafe() // will print everything after 1 second
mergeMap
I was looking for. SSE does take forever - basically subscribing to an event stream until a particular event is observed, then client should make the choice to disconnect. Monitoring multiple SSE endpoints for events like this - trying to do it with Observables and having trouble expressing:mergeMap
seems to be the proper combinator for this... thank you!!)takeWhile
approach on the Observable wrapping the SSEs, but due to the concurrent merge the stream ends when the first SSE sees the event I'm looking for rather then when ALL SSEs have seen the given event... (I see a forall
combinator, but I'm not 100% sure how to make use of it)
val observable = Observable.fromIterable(requestIds).mergeMap{ ids =>
Observable.fromReactivePublisher(SSEPublisher).takeWhileInclusive{ status => status != TaskStatus.DONE }
}.filter{ status => status == TaskStatus.DONE }
val consumer = Consumer.foreachTask[TaskStatus]{ status => persistStatusToDatabase(status) }
val task = observable.consumeWith(consumer)
task.runToFuture
Expected Behavior: Each SSE client is disconnected once it observes the first 'DONE' status message on its channel. The consumer
sees all DONE
messages across all channels and persists each to the database. (presuming here in this pseudocode that the status
object has some sense of its "channel" it is carrying along with it.
Actual Behavior: The first SSE channel to observe a DONE
message wins, is committed to the database, and the stream appears to terminate.
I suspect I'm missing something fundamental here, but I figured since I was creating a bunch of Observable
s and mergeMap
-ing them together, each with their own condition for termination (the takeWhileInclusive
bit) that the end result would just be a stream of statuses from all channels, with the final message from each channel on the stream being the DONE
event.
implicit val s = Scheduler.global
val observable = Observable.range(1, 6)
.mergeMap { _ =>
Observable.range(0, 5).takeWhileInclusive{ _ != 3 }
}
.filter{ _ == 3 }
val consumer = Consumer.foreachTask[Long]{ status => Task(println(s"persist $status")) }
val task = observable.consumeWith(consumer)
task.runSyncUnsafe()
persistStatusToDatabase(status)
might be silently throwing an exception and killing the stream... not sure the exception isn't being reported when I use observable.doOnError{...}
but maybe it's because the exception is being thrown within the Consumer.
mergeMap
is exactly what I was looking for and for some reason absolutely couldn't find it yesterday - so thanks for that hint too!
Hi, I am a newcomer in Monix and I am trying to do something like this:
lazy val io = Scheduler.io(name="my-io")
val source = Task {
//..blocking IO here, actually waiting for the user fingerprint, may wait for hours...
}
// execute in another logical thread
val task: CancelableFuture[Unit] = source.executeOn(io).asyncBoundary.runToFuture
// I changed my mind and don't want to wait
// so I am trying to cancel
task.cancel() // <- IT IS NOT WORKING
Basically I have a block IO operation and I am trying to execute in another logical thread, but if I try to cancel it, nothing happens.
I just made a test here, and it works if I change to:
lazy val io = Scheduler.io(name="my-io")
val source = Task {
Thread.sleep(99999)
}
val task: CancelableFuture[Unit] = source.executeOn(io).asyncBoundary.runToFuture
task.cancel()
But when I put inside the Task
the comand to capture the fingerprint, it does not work:
val source = Task {
import scala.sys.process._
"/path/to/my/fingerprint/capture".!! // it is just a cli command that waits for the fingerprint and save it in an jpeg file
}
Some light do be shed?
Thread.sleep
you are blocking the thread, and even if there was a boundary for it to check if it was cancelled, it will never reach it.
If you're running a shell command you might want to consider either making it a background process that you can handle independently or using some kind of
timeout
to insert your own boundary to cancel on before retrying the command.
Thank you @sloshy :thumbsup: I will look for some docs of how to make it a background process and handle it independently
shellProcess.timeout(5.second)
won't actually stop the process if it doesn't have any cancelation capability (it's not possible to cancel arbitrary synchronous actions). I've never used shell commands from Scala so can't give you any details but in your case it might be possible to send CTRL + C there on cancelation
I was trying to use Monix.Task
to run the command in background, but I found out another construction of scala.sys.process
that already runs the process in the background and has a destroy
method:
import scala.sys.process._
val cmd = "/my/shell/command".run()
cmd.destroy()
Anyway it was a good opportunity to be introduced to Monix, until yesterday it was unknow to me. Thank you guys @sloshy @Avasil I really appreciate your help :thumbsup:
Hey, Im switching to monix and trying to figure out the best way to keep my program running in between the runs of my scheduler. I currently have:
val _ = scheduler.scheduleAtFixedRate(1, 120, TimeUnit.SECONDS, () => { ... })
while (true) { Thread.sleep(1000000)}
While this works, I didnt know if there was a better way to do it using the library. Thanks!
You can set threads in Scheduler
to not be daemonic, let's say:
implicit val scheduler = Scheduler.fixedPool("my-pool", 4, daemonic = false)
val _ = scheduler.scheduleAtFixedRate(1, 120, TimeUnit.MILLISECONDS, () => println("a"))
Alternatively, if you want to be pure, you can use Task
for your scheduled actions. To schedule Task
you can write a recursive function or just use a stream like Observable
from monix-reactive
, Iterant
from monix-tail
, fs2.Stream
(different library) or whatever you prefer, like this:
https://monix.io/docs/3x/reactive/observable.html#intervalatfixedrate
Then you just use TaskApp
in Main or call task.runSyncUnsafe()
yourself