Most discussion is on Typelevel Discord: https://discord.gg/bSQBZA3Ced
zip
processes one stream and then the other
zip
like stuff to learn Pull
is a unfortunate because you need to be aware of stepLeg
. I'd recommend using Pull
on only one stream to get familiar with it, it's actually pretty simple
Blocker
and not any old EC
evalOn
only changes what EC the IO runs on
evalOn
, it would fully consume your 8 cores to wait for the blocking call
evalMap(...).foldMonoid
or evalMap(...).compile.foldMonoid
should give you what you need in the two cases
Pull
mapAccumulate
, then see if you need more powerful)
Ref
(sometimes Ref
is easier for non-concurrent stuff as well, at which point it becomes a matter of style/preference)
are three mutually exclusive approaches
as a guideline ofc, I'm sure there are plenty of cases where this statement is false
mapAccumulate
: https://scastie.scala-lang.org/CBzo7dS0QGCzvTnbwUGJGQrepartition
. but I'm not sure it fits your case https://github.com/functional-streams-for-scala/fs2/blob/master/core/shared/src/main/scala/fs2/Stream.scala#L2373
I have a use case where streamA
publishes to some internal (bounded) queue while streamB
consumes from said queue and publishes data to some external system. My current approach is something like this:
trait MyPublisher[A] {
def publish(in: A): F[Unit]
}
// publisher should complete external publishing of all elements
// internally published before the stream terminates
def makeSafePublisher[A]: Stream[F, MyPublisher] {
val queue = Queue.boundedNoneTerminated[F, A](maxSize)
val internalPublisher = new MyPublisher[A] {
override def publish(in: A) = queue.enqueue1(in)
}
val internalPublish = Stream.emit(internalPublisher).onComplete(queue.enqueue1(None))
val externalPublish = queue.dequeue.evalMap(publishToKafka)
internalPublish concurrently externalPublish
However, it seems the system can shutdown while elements remain in the internal queue. What is the safest way to implement makeSafePublisher
that will ensure the internal queue is fully drained?