@DCameronMauch, I have no experience with RabbitMQ, but I'd start with this:
Stream
. The first search result gave me something called fs2-rabbit..evalTap
your per-event logic on that..compile.drain
the whole thing to get an action that will never yield (assuming the queue isn't closed).Resource
s it needs. E.g. val runWorker = mkEnv.use { env => pullEvents(env).evalTap(handleEvent(env, _)).compile.drain }
.IOApp
. IOApp
will cleanly cancel your program once it gets a corresponding signal, so you don't have to do anything special for that.Resource.make(runWorker.start)(_.cancel)
.After that I'd start looking at bottlenecks - e.g. replacing .evalTap
with .parEvalMapUnordered
.
@DCameronMauch You can use a cats.effect.concurrent.Ref[F, Boolean]
(or a cats.effect.concurrent.Deferred[F, Boolean]
but this might be more complex, less so with TryableDeferred
) as a signal of sorts to check at certain points in your code. For Ref
or TryableDeferred
that would involve checking it at various points in your code.
FWIW a lot of this use case is handled by libraries such as FS2, with Stream.resource
handling the "cleanly" part of cancellation (closing resources, etc) and .interruptWhen(signal)
cancelling your stream.
IO
makes easyIO
behaviors and you want to compose them more orthogonally than IO
makes easy[error] ... Symbol 'type cats.Parallel.Aux' is missing from the classpath.
[error] This symbol is required by 'method cats.effect.IOInstances.ioParallel'
on implicit val ec = implicitly[ExecutionContext]
implicit val cs: ContextShift[IO] = IO.contextShift(ec)
val sync = implicitly[Sync[IO]]
IO
, isn't it?
extends
in the same project very similar code compiles without problems. I'll look further tomorrow.
If I have an IO with a timeout:
IO(somefn).timeout(100 millis)
Is it possible for somefn
to create an IO which doesnt get timed out?
def somefn = {
readDb >> computeSomething >> fireOffaTask >> updateDb
}
I'm trying to have the fireOffaTask
run without being effected by timeout.