Most discussion is on Typelevel Discord: https://discord.gg/bSQBZA3Ced
but on using the second "copy" of that stream, I get the above error
right, you need to fully consume the body (bytes) once and then make any needed transformation, same with Http4s
def broadcasting[F[_]: Concurrent, A, B](pipes: fs2.Stream[F, Pipe[F, A, B]]): Pipe[F, A, B] =
(input: fs2.Stream[F, A]) =>
input.broadcast.zipWith(pipes)(_.through(_))
.parJoinUnbounded
java.lang.IllegalStateException: This publisher only supports one subscriber
broadcast
would be ideal if it works
I have a test which does:
val test: IO[Assertion] =
Stream.eval(setup).flatMap { data =>
myStreamResource(data).flatMap { obj =>
obj.resultStream.take(1).evalMap(r => IO(assert(r.isOk)))
}
}.compile.lastOrError
^ And that doesn't terminate. Where myStreamResource's obj.resultStream
is a parJoin
of multiple bracketed streams
fail()
then the test fails on that line
global
or the scalatest EC
Hi all - I'm looking for a way to gracefully terminate a stream. When I cancel a stream backed by a queue, I would like for the queue to drain before terminating. Here's an example of what I'm looking for https://scastie.scala-lang.org/31krqh0ZSWuxPbZ3G9S2tQ
val stream = Stream.eval(InspectableQueue.bounded[IO, Int](5))
.flatMap { queue =>
Stream.range(0, 100).covary[IO].evalTap(i => IO(println(s"enqueued $i"))).through(queue.enqueue)
.concurrently(queue.dequeue.evalTap(i => IO(println(s"dequeued $i"))).metered(1.second))
}
val program = IO.race(
stream.compile.drain,
IO.sleep(2.seconds)
)
// I would like to see a "dequeue" for every "enqueue"
program.unsafeRunSync()
I'm coming from akka-streams, and using that I would first wait to close the queue until my downstream has consumed all of the outstanding messages
Thanks for the suggestions @Billzabob - I'll take a look. It's worth noting that I'm not actually using concurrently
for the "real" code - it looks like this
val messages = for {
// resource is a BoundedQueue. An async process enqueues messages
queue <- Stream.resource(Resource(resource))
messages <- queue.dequeue
} yield messages
// Started inside of an IOApp.run
messages.parEvalMap(10)(myProcess).evalMap(ackMessage).compile.drain
My goal is be able to cancel the processing stream (by killing the running IOApp) and not have any unacked messages in the queue. In other words, I'm really looking for a way to gracefully terminate the stream.
concurrently
does not work in this case since it uses interruptWhen
, and ultimately that relies on the interruption mechanics, which are non deterministic by design
Concurrent#cancelable
builder to explicitly enqueue a None
value when the stream is to cancelled. The downside to that is I can't expose a Stream[F, Message]
in my API and still have the "graceful" shutdown. Do you know of a way to enqueue a None
value and still expose a stream in my public API?
@hudclark re: having code wait for "clean" shutdown of a NoneTerminatedQueue
It's easiest if you can control producer+consumer in one location because you can do something like:
val read = q.dequeue.flatMap(consume) ++ isFinishedSignalRef.set(true)
val write = producer.run(q) ++ q.enqueue1(None)
Stream(
(read concurrently write),
isFinishedSignalRef.discrete.dropWhile(_ == false).take(1) >> runOnTermination
).parJoinUnbounded
Resource
or something
Stream.eval(InspectableQueue.bounded[F, Option[PubSubMessageEnvelope[F]]](settings.prefetch)).flatMap { queue =>
val read = queue.dequeue.unNoneTerminate
// When the publisher closes, it will enqueue a None, stopping the queue
val publisher = Stream.resource(Resource(createPublisher(queue)))
val stopper = publisher *> Stream.eval(stop.get.attempt).take(1)
read concurrently stopper
}
Hello there!
Guys is it possible to have a dynamic broadcast?
I mean .. from docs:
Stream(1,2,3,4).broadcast.map { worker =>
worker.evalMap { o => IO.println("1:" + o.toString) }
}.take(3).parJoinUnbounded
But what if I have to deal with N
streams (not 3) and this number can change over time?
val processors = List(
v => v > 5,
v => v > 10,
// ...
)
Stream(1,2,3,4)
.through(Broadcast(minReady = 1))
.take(processors.length)
.zipWithIndex
.flatMap { case (src, idx) => src.filter(processors(idx)).fold(...) }
.parJoinUnbounde