scala> :paste
// Entering paste mode (ctrl-D to finish)
import cats.effect._
import cats.implicits._
import cats.effect.implicits._
import scala.concurrent.ExecutionContext
val cs = IO.contextShift(ExecutionContext.global)
def parMapIO(implicit cs: ContextShift[IO]) = {
val ioA = IO("A")
val ioB = IO(10)
val ioC = IO(false)
(ioA, ioB, ioC).parMapN { (a, b, c) => println(s"done: $a $b $c") }
}
scala> parMapIO(cs).unsafeRunSync
done: A 10 false
scala> :paste
// Entering paste mode (ctrl-D to finish)
def parMapF[F[_]](implicit F: ConcurrentEffect[F], cs: ContextShift[F]) = {
val ioA = F.delay("A")
val ioB = F.delay(10)
val ioC = F.delay(false)
(ioA, ioB, ioC).parMapN { (a, b, c) => println(s"$a > $b > $c") }
}
// Exiting paste mode, now interpreting.
<pastie>:25: error: could not find implicit value for parameter p: cats.NonEmptyParallel[F,F]
(ioA, ioB, ioC).parMapN { (a, b, c) => println(s"$a > $b > $c") }
^
<pastie>:21: warning: parameter value F in method parMapF is never used
def parMapF[F[_]](implicit F: ConcurrentEffect[F], cs: ContextShift[F]) = {
^
<pastie>:21: warning: parameter value cs in method parMapF is never used
def parMapF[F[_]](implicit F: ConcurrentEffect[F], cs: ContextShift[F]) = {
^
Long.MaxValue
as the number of permits is a fine way of getting what I need. What are your thoughts?
oldFiber.cancel
and buzzer.complete
but it's very very likely innocuous
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
implicit val executor = system.getDispatcher
val ticker = Source.tick(0.second, 1.second, "H")
.runWith(Sink.foreach(println))
ticker
of the ticker is Future[Done]
fs2-reactive-streams
will get you a stream where multiple libraries form the connecting pieces of it, which might be what you want. Another alternative is to just do an unsafeRunSync
inside of the foreach
(not my first choice but it should "work"). Personally if it were me I would defer to fs2 as much as possible as it has direct support for cats effect .
fs2-reactive-streams
. If your application is already mostly Akka Streams and you just want to run some isolated pure code in there, just running the IO
is also an option (albeit a compromise)
run
the Akka stream, since Future
s start immediately. Put the run
expression in IO.fromFuture
.