Most discussion is on Typelevel Discord: https://discord.gg/bSQBZA3Ced
Ref
(sometimes Ref
is easier for non-concurrent stuff as well, at which point it becomes a matter of style/preference)
are three mutually exclusive approaches
as a guideline ofc, I'm sure there are plenty of cases where this statement is false
mapAccumulate
: https://scastie.scala-lang.org/CBzo7dS0QGCzvTnbwUGJGQrepartition
. but I'm not sure it fits your case https://github.com/functional-streams-for-scala/fs2/blob/master/core/shared/src/main/scala/fs2/Stream.scala#L2373
I have a use case where streamA
publishes to some internal (bounded) queue while streamB
consumes from said queue and publishes data to some external system. My current approach is something like this:
trait MyPublisher[A] {
def publish(in: A): F[Unit]
}
// publisher should complete external publishing of all elements
// internally published before the stream terminates
def makeSafePublisher[A]: Stream[F, MyPublisher] {
val queue = Queue.boundedNoneTerminated[F, A](maxSize)
val internalPublisher = new MyPublisher[A] {
override def publish(in: A) = queue.enqueue1(in)
}
val internalPublish = Stream.emit(internalPublisher).onComplete(queue.enqueue1(None))
val externalPublish = queue.dequeue.evalMap(publishToKafka)
internalPublish concurrently externalPublish
However, it seems the system can shutdown while elements remain in the internal queue. What is the safest way to implement makeSafePublisher
that will ensure the internal queue is fully drained?
Traverse
like other Collection classes? Say I have the same situation in https://stackoverflow.com/questions/53806418/chaining-a-number-of-transitions-with-the-state-monad
traverse
, so the answer does not work in my situation
IO
monad can't because it represents a non-referentially transparent computation
evalMap
and others, but the "leave stream" part needs to go through the .compile
step
Stream
for F=Pure
could have such an instance
F
couldn't
StateT
, but there is nothing wrong with doing what you're doing at first, so definitely keep exploring :)
compile.list
:P
Nothing
for a method impl satisfies the contract or not
evalMap
is... Any advice?
F[_]
, then evaluates the F
to get the element back
pure
so it might be easier to skip the T
and just use State
Stream.eval(first) ++ Stream.eval(second) ++ Stream.eval(third)
but then why are you using Stream
? Could just have first >> second >> third