Async
, so my async there is not idempotent (which it has to be)
Hello guys!)
I’ve started exploring world of FP recently, and now I’m trying tu build my first http-service with http4s
.
And I get these debug messages in my console (on every request that consul does):
Request headers: Headers(Host: docker.for.mac.localhost:7000, User-Agent: Consul Health Check, Accept: text/plain, text/*, */*, Accept-Encoding: gzip, Connection: close)
[21:31:24.850] [scala-execution-context-global-40 ] DEBUG - closeConnection()
[21:31:24.850] [scala-execution-context-global-40 ] DEBUG - Shutting down HttpPipeline
[21:31:24.850] [scala-execution-context-global-40 ] DEBUG - Canceled request
[21:31:24.850] [scala-execution-context-global-40 ] DEBUG - Shutting down.
[21:31:24.851] [scala-execution-context-global-40 ] DEBUG - Shutting down HttpPipeline
[21:31:24.851] [scala-execution-context-global-40 ] DEBUG - Canceled request
[21:31:24.851] [scala-execution-context-global-40 ] DEBUG - Shutting down.
[21:31:24.851] [scala-execution-context-global-40 ] DEBUG - Shutting down idle timeout stage
[21:31:24.851] [scala-execution-context-global-40 ] DEBUG - Shutting down.
[21:31:24.851] [scala-execution-context-global-40 ] DEBUG - Shutting down.
[21:31:24.851] [blaze-selector-7 ] DEBUG - Stage NIO1HeadStage sending inbound command: Disconnected
[21:31:24.851] [blaze-selector-7 ] DEBUG - Shutting down idle timeout stage
[21:31:24.851] [blaze-selector-7 ] DEBUG - Shutting down.
[21:31:24.851] [blaze-selector-7 ] DEBUG - Stage IdleTimeoutStage sending inbound command: Disconnected
[21:31:24.851] [blaze-selector-7 ] DEBUG - Shutting down HttpPipeline
[21:31:24.851] [blaze-selector-7 ] DEBUG - Canceled request
[21:31:24.851] [blaze-selector-7 ] DEBUG - Shutting down.
My code looks like:
def apply[F[_]: Sync: Async](port: Int)(
implicit cs: ConcurrentEffect[F],
timer: Timer[F],
): F[Http4sPrometheusExporter] = {
for {
prometheusService <- Http4sPrometheusService.build[F]
httpApp = Router[F]("/" -> prometheusService.routes).orNotFound
server <- BlazeServerBuilder[F]
.bindHttp(port, "0.0.0.0")
.withHttpApp(httpApp)
.resource
.use(_ => Async[F].never[Unit])
.start
} yield new Http4sPrometheusExporter()
}
Is it ok? Or I’m duing something wrong?
Appreciate any help :)
Functor
typeclass describes the behavior of map: F[A] => (A => B) => F[B]
and laws like composition; fa.map(f).map(g)
== fa.map(f andThen g)
@vimalaguti you want to use ConcurrentEffect
, like any other typeclasses, when you don't want to be tied to a concrete IO
in your signatures.
Specifically, Effect
is used when you need to interop with some library that calls YOUR code, such as APIs that let you register a callback (almost everything in Scala.JS) and Concurrent
is used where you want flow control with fibers and .start
, or for data structures with asynchronous blocking, like Semaphore
, Deferred
, fs2's Queue
, monix ConcurrentChannel
, etc.
ConcurrentEffect
bundles the two to avoid ambiguity issues (they both extend Async
which would break extension methods), and also gives you a run, but give me a cancel token
on top of Effect
.
IO
has some intricacies - you can't do io.start
without implicit ContextShift[IO]
in scope, but you can always unsafeRunAsync
it. For instance, Monix Task requires a Scheduler
for unsafeRunAsync
but you can always do task.start
. ConcurrentEffect
lets you write code that is independent of these details and works for all effect types in the same way.
It isn't, however, a typeclass you want to use often, it literally says "here be dragons, anything can happen", and also you can't use it with anything less powerful than IO+ContextShift, or a Task+Scheduler. Something that only requires Sync
, for example, can be used with SyncIO
or OptionT[IO, ?]
ConcurrentEffect
is a typeclass of last resort.
Async
, so that I can run many of them, and use Deferred
as data type?
Async
doesn't let you "run many of them" out of the box - if your underlying API is thread-blocking, you can't get around with thread management, but you should be able to only use Sync[F] + ContextShift[F] + Blocker
Concurrent
on the outside, to manage two slots independently, but that is irrelevant
Concurrent
is probably better since you get cancelable version of Deferred
when you have it
scala> :paste
// Entering paste mode (ctrl-D to finish)
import cats.effect._
import cats.implicits._
import cats.effect.implicits._
import scala.concurrent.ExecutionContext
val cs = IO.contextShift(ExecutionContext.global)
def parMapIO(implicit cs: ContextShift[IO]) = {
val ioA = IO("A")
val ioB = IO(10)
val ioC = IO(false)
(ioA, ioB, ioC).parMapN { (a, b, c) => println(s"done: $a $b $c") }
}
scala> parMapIO(cs).unsafeRunSync
done: A 10 false
scala> :paste
// Entering paste mode (ctrl-D to finish)
def parMapF[F[_]](implicit F: ConcurrentEffect[F], cs: ContextShift[F]) = {
val ioA = F.delay("A")
val ioB = F.delay(10)
val ioC = F.delay(false)
(ioA, ioB, ioC).parMapN { (a, b, c) => println(s"$a > $b > $c") }
}
// Exiting paste mode, now interpreting.
<pastie>:25: error: could not find implicit value for parameter p: cats.NonEmptyParallel[F,F]
(ioA, ioB, ioC).parMapN { (a, b, c) => println(s"$a > $b > $c") }
^
<pastie>:21: warning: parameter value F in method parMapF is never used
def parMapF[F[_]](implicit F: ConcurrentEffect[F], cs: ContextShift[F]) = {
^
<pastie>:21: warning: parameter value cs in method parMapF is never used
def parMapF[F[_]](implicit F: ConcurrentEffect[F], cs: ContextShift[F]) = {
^