def monotonicAscendingMerge[A](as: Stream[Pure, A], bs: Stream[Pure, A])(implicit ev: A => Ordered[A]): Stream[Pure, A] = {
(as, bs) match {
case (Stream.empty, _) => bs
case (_, Stream.empty) => as
case (aHead#::aTail, bHead#::bTail) =>
if (aHead < bHead) aHead #:: monotonicAscendingMerge(aTail, bs)
else bHead #:: monotonicAscendingMerge(as, bTail)
}
def monotonicAscendingMerge[F[_], A](as: Stream[F, A], bs: Stream[F, A])(implicit ev: A => Ordered[A]): Stream[F, A] = {
// take two streams or ordered types that are monotonically increasing and
// create a result stream that merges the input streams to
// produce an output that is monotonically increasing
def go(inL: Option[(A, Stream[F, A])], inR: Option[(A, Stream[F, A])]): Pull[F, A, Unit] =
(inL, inR) match {
case (None, None) => Pull.done
case (Some((lhs, lhsNext)), None) =>
Pull.output1(lhs) >> lhsNext.pull.uncons1.flatMap(go(_, None))
case (None, Some((rhs, rhsNext))) =>
Pull.output1(rhs) >> rhsNext.pull.uncons1.flatMap(go(None, _))
case (Some((lhs, lhsNext)), Some((rhs, rhsNext))) =>
if (lhs < rhs)
Pull.output1(lhs) >> lhsNext.pull.uncons1.flatMap(go(_, inR))
else
Pull.output1(rhs) >> rhsNext.pull.uncons1.flatMap(go(inL, _))
}
as.pull.uncons1.flatMap(uca => bs.pull.uncons1.flatMap(ucb => go(uca, ucb))).stream
}
StepLeg
when you Pull
from more than one stream
def data(id: Int): Stream[F, Row] = {
for {
q <- Stream.eval(Queue.noneTerminated[F, Row])
_ <- Stream.eval { Concurrent[F].delay {
someApi.asyncData(id, new AsyncDataHandler {
override def onRow(r: Row): Unit = q.enqueue1(Some(r))
override def onEnd(): Unit = q.enqueue1(None)
})
}
}
r <- q.dequeue
} yield r
}
val rowsEndpoint = HttpRoutes.of[IO] {
case GET -> Root / “data" / IntVar(id) => {
Ok(data(id).map(r =>
s""”{"col1": ${r.col1()}}"""
))
}
}
onRow()
and onEnd()
are being hit
Dispatcher
because the testing in fs2-grpc is a bit hairy, so I did a new trait UnsafeRunner
that just wraps Dispatcher
and allows for stubbing. However, that is not needed after Fabio's change that got merged.
Hi all. I'd appreciate a steer from those that know better. I'm attempting to implement a pipe to merge/align several streams with a signature like this:
def mergeIndexed[F[_], A]: Pipe[F, Seq[IndexedSeq[A]], IndexedSeq[Seq[A]]] = ???
where I'm going to keep elements in my final stream only when all indices are deemed 'aligned'.
Before I get into the weeds on this, is the right approach here to look at existing deterministic merging pipes and use StepLeg
as they do?
metered
is not working for you https://github.com/SystemFw/upperbound
Stream.fixedRate
or Stream.fixedDelay