Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Andy Polack
@ALPSMAC
Nope - determined at runtime
Monitoring a bunch of asynchronous processes that are tracking status of long-running tasks kicked off based on an out-of-band REST request.
I did look at using something like the zipMap operation... but found I ran into the problem where I don't know the number of SSEs until runtime.
Piotr Gawryś
@Avasil
There is also zipList variant if you have a list. Does the number of SSE changes dynamically or you know at some point (in runtime) that it's n SSE requests to run until all of them emit given element? In case of the latter it should be relatively easy to solve with semaphore
Oleg Pyzhcov
@oleg-py
@ALPSMAC why not then use takeWhile on each individual stream?
Piotr Gawryś
@Avasil
If my understanding is correct, he wants other streams to keep going until all of them emit specific item
Oleg Pyzhcov
@oleg-py
yeah, so you set the stop with takeWhile on each one and mergeMap them into a single stream.
Piotr Gawryś
@Avasil
Sorry, I meant all the streams to keep going (including those that emitted this item)
Andy Polack
@ALPSMAC
@Avasil and @oleg-py thanks for the feedback. Let me get a bit of pseudocode together to describe what I'm stumbling on in more detail.
Andy Polack
@ALPSMAC
val observable = Observable.fromIterable(requestIds).mergeMap{ ids => 
  Observable.fromReactivePublisher(SSEPublisher).takeWhileInclusive{ status => status != TaskStatus.DONE }
}.filter{ status => status == TaskStatus.DONE }

val consumer = Consumer.foreachTask[TaskStatus]{ status => persistStatusToDatabase(status) }

val task = observable.consumeWith(consumer)

task.runToFuture

Expected Behavior: Each SSE client is disconnected once it observes the first 'DONE' status message on its channel. The consumer sees all DONE messages across all channels and persists each to the database. (presuming here in this pseudocode that the status object has some sense of its "channel" it is carrying along with it.

Actual Behavior: The first SSE channel to observe a DONE message wins, is committed to the database, and the stream appears to terminate.

I suspect I'm missing something fundamental here, but I figured since I was creating a bunch of Observables and mergeMap-ing them together, each with their own condition for termination (the takeWhileInclusive bit) that the end result would just be a stream of statuses from all channels, with the final message from each channel on the stream being the DONE event.

Oleg Pyzhcov
@oleg-py
Yeah, that's how I'd expect it to be. Could be a bug similar to one I fixed in #918.
Piotr Gawryś
@Avasil
And that's how it works in toy examples, e.g.
implicit val s = Scheduler.global

val observable = Observable.range(1, 6)
  .mergeMap { _ =>
    Observable.range(0, 5).takeWhileInclusive{ _ != 3 }
  }
  .filter{ _ == 3 }

val consumer = Consumer.foreachTask[Long]{ status => Task(println(s"persist $status")) }

val task = observable.consumeWith(consumer)

task.runSyncUnsafe()
Oleg Pyzhcov
@oleg-py
What if you throttle that inner observable?
and make observables terminate at different intervals?
Andy Polack
@ALPSMAC
Yeah - let me give that a shot and see what happens.
Oleg Pyzhcov
@oleg-py
I'm looking for ways to break it, not fix it :D
Piotr Gawryś
@Avasil
.mergeMap { i =>
    Observable.range(0, 5).delayOnNext(100 * i.millis).takeWhileInclusive{ _ != 3 }
  }
still works unfortunately
Andy Polack
@ALPSMAC
for what it's worth I'm using an unbounded cached thread pool to back the Scheduler I'm using to dispatch the work on.
Andy Polack
@ALPSMAC
Ah - actually I think maybe I'm onto something.... I think persistStatusToDatabase(status) might be silently throwing an exception and killing the stream... not sure the exception isn't being reported when I use observable.doOnError{...} but maybe it's because the exception is being thrown within the Consumer.
Yep - that's the cause! Thanks for the troubleshooting assist - greatly appreciate it!
Also - that mergeMap is exactly what I was looking for and for some reason absolutely couldn't find it yesterday - so thanks for that hint too!
Piotr Gawryś
@Avasil
Awesome, glad to hear you managed to resolve your issue :D
Andy Polack
@ALPSMAC
Sincerely appreciate the prompt assistance - thanks @Avasil and @oleg-py !
Geovanny Junio
@geovannyjs

Hi, I am a newcomer in Monix and I am trying to do something like this:

lazy val io = Scheduler.io(name="my-io")

val source = Task { 
  //..blocking IO here, actually waiting for the user fingerprint, may wait for hours...
}

// execute in another logical thread
val task: CancelableFuture[Unit] = source.executeOn(io).asyncBoundary.runToFuture

// I changed my mind and don't want to wait
// so I am trying to cancel
task.cancel()  // <- IT IS NOT WORKING

Basically I have a block IO operation and I am trying to execute in another logical thread, but if I try to cancel it, nothing happens.

Geovanny Junio
@geovannyjs

I just made a test here, and it works if I change to:

lazy val io = Scheduler.io(name="my-io")

val source = Task { 
  Thread.sleep(99999)
}

val task: CancelableFuture[Unit] = source.executeOn(io).asyncBoundary.runToFuture

task.cancel()

But when I put inside the Task the comand to capture the fingerprint, it does not work:

val source = Task { 
  import scala.sys.process._
  "/path/to/my/fingerprint/capture".!!  // it is just a cli command that waits for the fingerprint and save it in an jpeg file
}

Some light do be shed?

Geovanny Junio
@geovannyjs
Even a shell command cannot be cancelled:
val source = Task { 
  import scala.sys.process._
  "sleep 99999".!! 
}
Ryan Peters
@sloshy
@geovannyjs IIRC - someone please correct me if I'm wrong - Monix implements cancellation in a similar way to cats-effect in that there has to be a certain "boundary" for cancellation to happen. Every time this boundary is reached, the task checks if it has been cancelled, and if so, stops executing. If you run a shell command, you cannot cancel it because there is no point where you are checking the boundary. Likewise if you call Thread.sleep you are blocking the thread, and even if there was a boundary for it to check if it was cancelled, it will never reach it.
Monix has auto-cancellation on map/flatMap calls as an option
This blog post might be a bit outdated, but it lays out the difference in how they work: https://monix.io/blog/2018/03/20/monix-vs-cats-effect.html
Ryan Peters
@sloshy
For sleeping - use Task.sleep instead, as it is non-blocking compared to Thread.sleep
Ryan Peters
@sloshy
If you're running a shell command you might want to consider either making it a background process that you can handle independently or using some kind of timeout to insert your own boundary to cancel on before retrying the command.
Geovanny Junio
@geovannyjs

If you're running a shell command you might want to consider either making it a background process that you can handle independently or using some kind of timeout to insert your own boundary to cancel on before retrying the command.

Thank you @sloshy :thumbsup: I will look for some docs of how to make it a background process and handle it independently

Piotr Gawryś
@Avasil
Keep in mind that even calling shellProcess.timeout(5.second) won't actually stop the process if it doesn't have any cancelation capability (it's not possible to cancel arbitrary synchronous actions). I've never used shell commands from Scala so can't give you any details but in your case it might be possible to send CTRL + C there on cancelation
Geovanny Junio
@geovannyjs
@Avasil thank you :thumbsup:
Geovanny Junio
@geovannyjs

I was trying to use Monix.Task to run the command in background, but I found out another construction of scala.sys.process that already runs the process in the background and has a destroy method:

import scala.sys.process._
val cmd = "/my/shell/command".run()
cmd.destroy()

Anyway it was a good opportunity to be introduced to Monix, until yesterday it was unknow to me. Thank you guys @sloshy @Avasil I really appreciate your help :thumbsup:

Ryan Peters
@sloshy
Huh, TIL you can destroy commands. Good to know @geovannyjs
Jake Schwartz
@jakehschwartz_twitter

Hey, Im switching to monix and trying to figure out the best way to keep my program running in between the runs of my scheduler. I currently have:

  val _ = scheduler.scheduleAtFixedRate(1, 120, TimeUnit.SECONDS, () => { ... })
  while (true) { Thread.sleep(1000000)}

While this works, I didnt know if there was a better way to do it using the library. Thanks!

Piotr Gawryś
@Avasil

You can set threads in Scheduler to not be daemonic, let's say:

  implicit val scheduler = Scheduler.fixedPool("my-pool", 4, daemonic = false)

  val _ = scheduler.scheduleAtFixedRate(1, 120, TimeUnit.MILLISECONDS, () => println("a"))

Alternatively, if you want to be pure, you can use Task for your scheduled actions. To schedule Task you can write a recursive function or just use a stream like Observable from monix-reactive, Iterant from monix-tail, fs2.Stream (different library) or whatever you prefer, like this:

https://monix.io/docs/3x/reactive/observable.html#intervalatfixedrate

Then you just use TaskApp in Main or call task.runSyncUnsafe() yourself

Pavel Perikov
@p-pavel

Hi. Supposing I have some side effectual function (e.g.

log(msg: Any): Unit

) is there any reason I'd better run the invocation in Task in situations like

val s: Observable[String] = ???
   s.map(log) //vs s.evalMap(msg => Task(log(msg)))

?

Paweł Kiersznowski
@pk044
hey guys, i wanted to contribute to Monix so I decided to take a look at this issue: monix/monix#957 - i already implemented the function, but i've been thinking a lot about what should be tested
I wonder if writing a similar test to the one testing async execution (in AsyncStateActionObservableSuite) would be a good approach? perhaps do you have any suggestions how I should test it?
Piotr Gawryś
@Avasil
@pk044 in this case I think it's enough to just put 1 sanity check test in AsyncStateActionObservableSuite that it works for cats.effect.IO, it's like this in DoOnNextSuite
Alexandru Nedelcu
@alexandru

Hi all, I’m looking for input on this PR: monix/monix#960

cc @mdedetrich any thoughts?

Ryan Peters
@sloshy
@p-pavel see this conversation I had on the cats-effect gitter https://gitter.im/typelevel/cats-effect?at=5d3b31736ec2e14223f4ed5c
TL;DR you want to use map for pure functions. Side-effects ideally should always be suspended
It guarantees that they execute when and how you specify (across effect system implementations too)
Alexandru Nedelcu
@alexandru

@sloshy @p-pavel indeed, the implementation of Task and Observable currently catch errors and are able to suspend the execution in map and flatMap, however it’s best if the side effects are clearly documented via a proper type.

In our code, when you see Task, you know that it has a side effect, whereas if you don’t then you know that the function is pure and won’t launch your nuclear missiles.

@pk044 yes, adding a test similar to that of Observable.fromAsyncStateAction would be enough, and instead of Task use IO
Paweł Kiersznowski
@pk044
@Avasil @alexandru thanks for the response - going to give it a try right now :)
Pavel Perikov
@p-pavel
@sloshy thanks that exactly what I thought of