F[_]: Concurrent: Timer
Concurrent
is the typeclasses that provides racing
ioConcurrentEffect(cs.getConst)
this is potentially weird, but I'm going to assume you have a good reason for it :)
Const
evalOn
around the call to the Db
def foo[F[_]: Concurrent: ContextShift: Timer, A](query: IO[A], blocker: Blocker): IO[A] =
query.blockOn(Blocker).timeout(3.seconds)
@DCameronMauch, I have no experience with RabbitMQ, but I'd start with this:
Stream
. The first search result gave me something called fs2-rabbit..evalTap
your per-event logic on that..compile.drain
the whole thing to get an action that will never yield (assuming the queue isn't closed).Resource
s it needs. E.g. val runWorker = mkEnv.use { env => pullEvents(env).evalTap(handleEvent(env, _)).compile.drain }
.IOApp
. IOApp
will cleanly cancel your program once it gets a corresponding signal, so you don't have to do anything special for that.Resource.make(runWorker.start)(_.cancel)
.After that I'd start looking at bottlenecks - e.g. replacing .evalTap
with .parEvalMapUnordered
.
@DCameronMauch You can use a cats.effect.concurrent.Ref[F, Boolean]
(or a cats.effect.concurrent.Deferred[F, Boolean]
but this might be more complex, less so with TryableDeferred
) as a signal of sorts to check at certain points in your code. For Ref
or TryableDeferred
that would involve checking it at various points in your code.
FWIW a lot of this use case is handled by libraries such as FS2, with Stream.resource
handling the "cleanly" part of cancellation (closing resources, etc) and .interruptWhen(signal)
cancelling your stream.
IO
makes easyIO
behaviors and you want to compose them more orthogonally than IO
makes easy[error] ... Symbol 'type cats.Parallel.Aux' is missing from the classpath.
[error] This symbol is required by 'method cats.effect.IOInstances.ioParallel'
on implicit val ec = implicitly[ExecutionContext]
implicit val cs: ContextShift[IO] = IO.contextShift(ec)
val sync = implicitly[Sync[IO]]
IO
, isn't it?