Most discussion is on Typelevel Discord: https://discord.gg/bSQBZA3Ced
evalOn
only changes what EC the IO runs on
evalOn
, it would fully consume your 8 cores to wait for the blocking call
evalMap(...).foldMonoid
or evalMap(...).compile.foldMonoid
should give you what you need in the two cases
Pull
mapAccumulate
, then see if you need more powerful)
Ref
(sometimes Ref
is easier for non-concurrent stuff as well, at which point it becomes a matter of style/preference)
are three mutually exclusive approaches
as a guideline ofc, I'm sure there are plenty of cases where this statement is false
mapAccumulate
: https://scastie.scala-lang.org/CBzo7dS0QGCzvTnbwUGJGQrepartition
. but I'm not sure it fits your case https://github.com/functional-streams-for-scala/fs2/blob/master/core/shared/src/main/scala/fs2/Stream.scala#L2373
I have a use case where streamA
publishes to some internal (bounded) queue while streamB
consumes from said queue and publishes data to some external system. My current approach is something like this:
trait MyPublisher[A] {
def publish(in: A): F[Unit]
}
// publisher should complete external publishing of all elements
// internally published before the stream terminates
def makeSafePublisher[A]: Stream[F, MyPublisher] {
val queue = Queue.boundedNoneTerminated[F, A](maxSize)
val internalPublisher = new MyPublisher[A] {
override def publish(in: A) = queue.enqueue1(in)
}
val internalPublish = Stream.emit(internalPublisher).onComplete(queue.enqueue1(None))
val externalPublish = queue.dequeue.evalMap(publishToKafka)
internalPublish concurrently externalPublish
However, it seems the system can shutdown while elements remain in the internal queue. What is the safest way to implement makeSafePublisher
that will ensure the internal queue is fully drained?
Traverse
like other Collection classes? Say I have the same situation in https://stackoverflow.com/questions/53806418/chaining-a-number-of-transitions-with-the-state-monad
traverse
, so the answer does not work in my situation
IO
monad can't because it represents a non-referentially transparent computation
evalMap
and others, but the "leave stream" part needs to go through the .compile
step
Stream
for F=Pure
could have such an instance
F
couldn't