Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
  • Sep 05 2019 14:43
    @typelevel-bot banned @jdegoes
  • Jan 31 2019 21:17
    codecov-io commented #484
  • Jan 31 2019 21:08
    scala-steward opened #484
  • Jan 31 2019 18:19
    andywhite37 commented #189
  • Jan 31 2019 02:41
    kamilongus starred typelevel/cats-effect
  • Jan 30 2019 00:01
    codecov-io commented #483
  • Jan 29 2019 23:51
    deniszjukow opened #483
  • Jan 29 2019 23:37
  • Jan 29 2019 23:22
  • Jan 29 2019 20:26
    Rui-L starred typelevel/cats-effect
  • Jan 29 2019 18:01
    jdegoes commented #480
  • Jan 29 2019 17:04
    thomaav starred typelevel/cats-effect
  • Jan 28 2019 17:43
    asachdeva starred typelevel/cats-effect
  • Jan 28 2019 07:12
    alexandru commented #480
  • Jan 28 2019 05:45
    codecov-io commented #482
  • Jan 28 2019 05:35
    daron666 opened #482
  • Jan 27 2019 13:56
    codecov-io commented #481
  • Jan 27 2019 13:46
    lrodero opened #481
  • Jan 27 2019 05:47
    codecov-io commented #460
  • Jan 27 2019 05:37
    codecov-io commented #460
Dmitry Polienko

@DCameronMauch, I have no experience with RabbitMQ, but I'd start with this:

  1. Find a way to pull events from the queue as an fs2 Stream. The first search result gave me something called fs2-rabbit.
  2. .evalTap your per-event logic on that.
  3. .compile.drain the whole thing to get an action that will never yield (assuming the queue isn't closed).
  4. Bundle that in a single action with any Resources it needs. E.g. val runWorker = mkEnv.use { env => pullEvents(env).evalTap(handleEvent(env, _)).compile.drain }.
  5. Put that into an IOApp. IOApp will cleanly cancel your program once it gets a corresponding signal, so you don't have to do anything special for that.
  6. Alternatively, if you already have an application and just want to add a background process, wrap your application in Resource.make(runWorker.start)(_.cancel).

After that I'd start looking at bottlenecks - e.g. replacing .evalTap with .parEvalMapUnordered.

Ryan Peters

@DCameronMauch You can use a cats.effect.concurrent.Ref[F, Boolean] (or a cats.effect.concurrent.Deferred[F, Boolean] but this might be more complex, less so with TryableDeferred) as a signal of sorts to check at certain points in your code. For Refor TryableDeferred that would involve checking it at various points in your code.

FWIW a lot of this use case is handled by libraries such as FS2, with Stream.resource handling the "cleanly" part of cancellation (closing resources, etc) and .interruptWhen(signal) cancelling your stream.

Fabio Labella
if you actually use fs2, you can just use concurrently to embed exactly this pattern
a background process which however gets cleanly cancelled on termination
no other machinery needed
D Cameron Mauch
Thanks all!
Gavin Bisesi
@DCameronMauch Have you looked at fs2-rabbit?
yay gitter for not updating
D Cameron Mauch
I have not, but I will
Gavin Bisesi
missed a bunch of chat
@DCameronMauch We use it in production for "do work from rabbit queue" and it works just fine
D Cameron Mauch
sweet, thanks for the pointer
didn’t realize there was already something out there for my use case
Gavin Bisesi
Even if it's not a clean fit for your need it should save you a lot of work just from the cats-effect wrapping it already provides
but I'm 99% sure it solves it for you
fs2 is a good base for when you want
  • to do some kind of work on elements or batches of elements
  • you want some kind of pub/sub interaction, queues etc
  • you want to encode more complex concurrency behaviors than IO makes easy
  • you have a lot of IO behaviors and you want to compose them more orthogonally than IO makes easy
The c-e ecosystem is pretty broad at this point; I highly recommend googling "$task cats-effect" or "$task fs2" when you need a tool. Chances are you'll find something, or at least confirm that you need to do it yourself
Assen Kolov
What can I be doing wrong to get an [error] ... Symbol 'type cats.Parallel.Aux' is missing from the classpath. [error] This symbol is required by 'method cats.effect.IOInstances.ioParallel' on implicit val ec = implicitly[ExecutionContext] implicit val cs: ContextShift[IO] = IO.contextShift(ec) val sync = implicitly[Sync[IO]]
I see cats-effects 1.5.0 is evicted by 2.0.0 in the project, but in this code excerpt it is the same IO, isn't it?
Paul Snively
No; that's your issue. Something that depends on cats-effect 1.5.0 is getting 2.0.0 instead.
Assen Kolov

That is the whole class: ```class ClientTestEnv extends AsyncFlatSpec with Matchers with BeforeAndAfterAll {

implicit val ec = implicitly[ExecutionContext]
implicit val cs: ContextShift[IO] = IO.contextShift(ec)
val sync = implicitly[Sync[IO]] }```

Paul Snively
Of course you have a cats-effect dependency. The question is, what's evicting 1.5.0 in favor of 2.0.0?
Assen Kolov
Me, I defined cats-effect 2.0.0 in sbt. Actually, cats-core 1.5.0 is evicted in favour of 2.0.0, no cats-effect conflicts.
Paul Snively
Well, you can’t do that without upgrading the library with the 1.5.0 dependency.
Assen Kolov
it comes through circe 0.9. Do you think there's any circe code executed in the class above?
Paul Snively
You need to upgrade Circe, then.
Daniel Spiewak
actually there seems to be a more serious issue here
cats-effect should be binary compatible between 1.5 and 2.0, and cats itself should be as well, so the evictions shouldn't matter, but that's exactly at it looks like is happening
@kolov can you tell me what your cats-effect, cats, and circe versions are? circe 0.9 I take it?
Oleg Pyzhcov
it actually sounds like cats-core 1.5.0 is used instead of 2.0.0. Parallel.Aux - which is missing - is cats 2 thing
Daniel Spiewak
yeah, so cats is backwards compatible, it is not forwards compatible
so if cats-effect is pushed forward to 2.0.0, then cats must be as well
cats-effect would not be compatible at all with cats-core 1.x
Assen Kolov
@djspiewak from sbt evicted : org.typelevel:cats-core_2.12:2.0.0 is selected over 1.5.0, io.circe:circe-core_2.12:0.12.1 is selected over 0.11.0, cats-effects is 2.0.0.
After I posted this I noticed that in another class with different extends in the same project very similar code compiles without problems. I'll look further tomorrow.
Paul Snively
OK, so something depends on circe-core 0.11.0, which depends on cats-core 1.5.0.
@kolov: If we can figure out what depends on circe-core 0.11.0, that'll help.
@kolov: I recommend the sbt-dependency-graph plugin.
Daniel Spiewak
sbt-coursier has an even better coursierDependencyTree command, which I've always found to be more helpful than sbt-dependency-graph. It's a bit weird now though given that coursier is baked into sbt 1.3.0+
Paul Snively

If I have an IO with a timeout:

IO(somefn).timeout(100 millis)

Is it possible for somefn to create an IO which doesnt get timed out?

def somefn = {
   readDb >> computeSomething >> fireOffaTask >> updateDb

I'm trying to have the fireOffaTask run without being effected by timeout.

I'm thinking to just use Future to fire it off, but I'd rather keep it IO if possible.
Gabriel Volpe
Maybe ioa.start.void? @davidnadeau
Not sure what you're trying to achieve exactly
Do you care about error handling in that task you want to run in the background?
yes, but it can handle that itself.
im trying to implement a fire and forgot. That won't get a timeout exception.
also ioa.start.void will still get timeout exception.
Gabriel Volpe
You would only get the timeout if the timeout happens before calling ioa.start.void