f so, how does one orchestrate the consumer streams?
you don't need to do this explicitly, once you deploy multiple services, each with one consumer, in the same consumer group, they will coordinate among themselves to divvy up the partitions of the topic
delete
only has a test for whether it fails if it's non existant
Sync
allows SyncIO
whereas Concurrent
doesn't? Maybe it's not important though. I'll leave it to yourself and other reviewers make that judgement :smile:
def monotonicAscendingMerge[A](as: Stream[Pure, A], bs: Stream[Pure, A])(implicit ev: A => Ordered[A]): Stream[Pure, A] = {
(as, bs) match {
case (Stream.empty, _) => bs
case (_, Stream.empty) => as
case (aHead#::aTail, bHead#::bTail) =>
if (aHead < bHead) aHead #:: monotonicAscendingMerge(aTail, bs)
else bHead #:: monotonicAscendingMerge(as, bTail)
}
def monotonicAscendingMerge[F[_], A](as: Stream[F, A], bs: Stream[F, A])(implicit ev: A => Ordered[A]): Stream[F, A] = {
// take two streams or ordered types that are monotonically increasing and
// create a result stream that merges the input streams to
// produce an output that is monotonically increasing
def go(inL: Option[(A, Stream[F, A])], inR: Option[(A, Stream[F, A])]): Pull[F, A, Unit] =
(inL, inR) match {
case (None, None) => Pull.done
case (Some((lhs, lhsNext)), None) =>
Pull.output1(lhs) >> lhsNext.pull.uncons1.flatMap(go(_, None))
case (None, Some((rhs, rhsNext))) =>
Pull.output1(rhs) >> rhsNext.pull.uncons1.flatMap(go(None, _))
case (Some((lhs, lhsNext)), Some((rhs, rhsNext))) =>
if (lhs < rhs)
Pull.output1(lhs) >> lhsNext.pull.uncons1.flatMap(go(_, inR))
else
Pull.output1(rhs) >> rhsNext.pull.uncons1.flatMap(go(inL, _))
}
as.pull.uncons1.flatMap(uca => bs.pull.uncons1.flatMap(ucb => go(uca, ucb))).stream
}
StepLeg
when you Pull
from more than one stream
def data(id: Int): Stream[F, Row] = {
for {
q <- Stream.eval(Queue.noneTerminated[F, Row])
_ <- Stream.eval { Concurrent[F].delay {
someApi.asyncData(id, new AsyncDataHandler {
override def onRow(r: Row): Unit = q.enqueue1(Some(r))
override def onEnd(): Unit = q.enqueue1(None)
})
}
}
r <- q.dequeue
} yield r
}
val rowsEndpoint = HttpRoutes.of[IO] {
case GET -> Root / “data" / IntVar(id) => {
Ok(data(id).map(r =>
s""”{"col1": ${r.col1()}}"""
))
}
}
onRow()
and onEnd()
are being hit