Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
  • Sep 05 2019 14:43
    @typelevel-bot banned @jdegoes
  • Jan 31 2019 21:17
    codecov-io commented #484
  • Jan 31 2019 21:08
    scala-steward opened #484
  • Jan 31 2019 18:19
    andywhite37 commented #189
  • Jan 31 2019 02:41
    kamilongus starred typelevel/cats-effect
  • Jan 30 2019 00:01
    codecov-io commented #483
  • Jan 29 2019 23:51
    deniszjukow opened #483
  • Jan 29 2019 23:37
  • Jan 29 2019 23:22
  • Jan 29 2019 20:26
    Rui-L starred typelevel/cats-effect
  • Jan 29 2019 18:01
    jdegoes commented #480
  • Jan 29 2019 17:04
    thomaav starred typelevel/cats-effect
  • Jan 28 2019 17:43
    asachdeva starred typelevel/cats-effect
  • Jan 28 2019 07:12
    alexandru commented #480
  • Jan 28 2019 05:45
    codecov-io commented #482
  • Jan 28 2019 05:35
    daron666 opened #482
  • Jan 27 2019 13:56
    codecov-io commented #481
  • Jan 27 2019 13:46
    lrodero opened #481
  • Jan 27 2019 05:47
    codecov-io commented #460
  • Jan 27 2019 05:37
    codecov-io commented #460
Daniel Spiewak

I don't know what's the current stance on this. In my opinion methods on IO improve user experience and allow optimization opportunities. People not using IntelliJ (or with better computer) can argue discoverability part because they can always import cats.implicits._ but that's not the case with me. IMO methods on IO + passing thread pool only at execution/evalOn (coming in 3.x I suppose) are two things that are really convenient for users.

This is my view of things. IMO IO should have specific versions of most of the common functions, even if we can inherit them from typeclasses, and the typeclass instances should override to point to our duplicates

There are a number of advantages to this, not the least of which is reducing the import tax.
It's like, for example, the fact that we define IO#map despite the fact that we could just inherit it from Functor via Monad. Part of that is for efficiency, of course, but there are other functions which are similar to this
Ivan Aristov
Hi all. I have some stupid question. I wanna run some racing IO using the Concurrent Object-companion directrly. So, the ContextShift that needs to this operation is the context for execution the background fibers or context for back down from fibers? Thanks!
The example code:
  import retry._
  import retry.CatsEffect._
  import cats.effect.IO.ioConcurrentEffect
  import cats.implicits._
  import scala.concurrent.duration._

  val policyDB: RetryPolicy[IO] = limitRetries[IO](5) join RetryPolicies.constantDelay(50 milliseconds)

  def handler(implicit log: LogAlg, sentry: SentryAlg): (Throwable, RetryDetails) => IO[Unit] = (ex, st) => {
    st match {
      case RetryDetails.GivingUp(retries, delay) =>
        val message = ex.getMessage
        val exc = new Throwable(s"DB policy is giving up after $retries retries and $delay delay with message: '$message'", ex)
      case RetryDetails.WillDelayAndRetry(delay, retries, overDelay) =>
        log.error(s"DB policy is so far got $retries retries with $overDelay over delay, start next try after $delay delay")

  def queryRunner[T](implicit timer: Timer[IO], cs: Const[ContextShift[IO], RaceDb], log: LogAlg, sentry: SentryAlg): Endo[IO[T]] = {
    (Concurrent.timeout(_: IO[T], 4 seconds)(ioConcurrentEffect(cs.getConst), timer)) >>>
    retryingOnAllErrors[T](policyDB, handler.apply)
Fabio Labella
the ContextShift is there for execution
Ivan Aristov
Thank you!
Fabio Labella
you can take Concurrent[IO] instead
of F[_]: Concurrent: Timer
which is arguably more correct
Concurrent is the typeclasses that provides racing


this is potentially weird, but I'm going to assume you have a good reason for it :)

Ivan Aristov
I know this is bad, but i wanna run queries for DB it another ThreadPool.
Maybe i going to extract value from Const to another implicit value
Fabio Labella
ok, then no
that's not how you do it
forget about the Const
do not pass instances implicitly
but use evalOn around the call to the Db
  def foo[F[_]: Concurrent: ContextShift: Timer, A](query: IO[A], blocker: Blocker): IO[A] =
Ivan Aristov
Do you mean that i should use only one ContextShift per app and few ThreadPools for running some tasks directly to pools?
Oleg Pyzhcov
@Ssstlis yes
Fabio Labella
(the type above should return F[A], if you want to use IO it's the same but don't have the Concurrent constraint)
D Cameron Mauch
How would I go about creating an independent async worker that is cancellable, but otherwise never terminates? I specifically want a worker that listens to a RabbitMQ queue, and when it gets something, does some work. This worker will never return. But I want to be able to cleanly cancel it when the program is being shut down.
Fabio Labella
@DCameronMauch this is easier with fs2 tbh, but in cats-effect only you can basically start, and then you need to get a hold of the fiber and store it somewhere (e.g. a ref), and then take it from there and sequence into your finalisers with guarantee
Dmitry Polienko

@DCameronMauch, I have no experience with RabbitMQ, but I'd start with this:

  1. Find a way to pull events from the queue as an fs2 Stream. The first search result gave me something called fs2-rabbit.
  2. .evalTap your per-event logic on that.
  3. .compile.drain the whole thing to get an action that will never yield (assuming the queue isn't closed).
  4. Bundle that in a single action with any Resources it needs. E.g. val runWorker = mkEnv.use { env => pullEvents(env).evalTap(handleEvent(env, _)).compile.drain }.
  5. Put that into an IOApp. IOApp will cleanly cancel your program once it gets a corresponding signal, so you don't have to do anything special for that.
  6. Alternatively, if you already have an application and just want to add a background process, wrap your application in Resource.make(runWorker.start)(_.cancel).

After that I'd start looking at bottlenecks - e.g. replacing .evalTap with .parEvalMapUnordered.

Ryan Peters

@DCameronMauch You can use a cats.effect.concurrent.Ref[F, Boolean] (or a cats.effect.concurrent.Deferred[F, Boolean] but this might be more complex, less so with TryableDeferred) as a signal of sorts to check at certain points in your code. For Refor TryableDeferred that would involve checking it at various points in your code.

FWIW a lot of this use case is handled by libraries such as FS2, with Stream.resource handling the "cleanly" part of cancellation (closing resources, etc) and .interruptWhen(signal) cancelling your stream.

Fabio Labella
if you actually use fs2, you can just use concurrently to embed exactly this pattern
a background process which however gets cleanly cancelled on termination
no other machinery needed
D Cameron Mauch
Thanks all!
Gavin Bisesi
@DCameronMauch Have you looked at fs2-rabbit?
yay gitter for not updating
D Cameron Mauch
I have not, but I will
Gavin Bisesi
missed a bunch of chat
@DCameronMauch We use it in production for "do work from rabbit queue" and it works just fine
D Cameron Mauch
sweet, thanks for the pointer
didn’t realize there was already something out there for my use case
Gavin Bisesi
Even if it's not a clean fit for your need it should save you a lot of work just from the cats-effect wrapping it already provides
but I'm 99% sure it solves it for you
fs2 is a good base for when you want
  • to do some kind of work on elements or batches of elements
  • you want some kind of pub/sub interaction, queues etc
  • you want to encode more complex concurrency behaviors than IO makes easy
  • you have a lot of IO behaviors and you want to compose them more orthogonally than IO makes easy
The c-e ecosystem is pretty broad at this point; I highly recommend googling "$task cats-effect" or "$task fs2" when you need a tool. Chances are you'll find something, or at least confirm that you need to do it yourself
Assen Kolov
What can I be doing wrong to get an [error] ... Symbol 'type cats.Parallel.Aux' is missing from the classpath. [error] This symbol is required by 'method cats.effect.IOInstances.ioParallel' on implicit val ec = implicitly[ExecutionContext] implicit val cs: ContextShift[IO] = IO.contextShift(ec) val sync = implicitly[Sync[IO]]
I see cats-effects 1.5.0 is evicted by 2.0.0 in the project, but in this code excerpt it is the same IO, isn't it?
Paul Snively
No; that's your issue. Something that depends on cats-effect 1.5.0 is getting 2.0.0 instead.
Assen Kolov

That is the whole class: ```class ClientTestEnv extends AsyncFlatSpec with Matchers with BeforeAndAfterAll {

implicit val ec = implicitly[ExecutionContext]
implicit val cs: ContextShift[IO] = IO.contextShift(ec)
val sync = implicitly[Sync[IO]] }```

Paul Snively
Of course you have a cats-effect dependency. The question is, what's evicting 1.5.0 in favor of 2.0.0?
Assen Kolov
Me, I defined cats-effect 2.0.0 in sbt. Actually, cats-core 1.5.0 is evicted in favour of 2.0.0, no cats-effect conflicts.
Paul Snively
Well, you can’t do that without upgrading the library with the 1.5.0 dependency.
Assen Kolov
it comes through circe 0.9. Do you think there's any circe code executed in the class above?
Paul Snively
You need to upgrade Circe, then.