AnyVal
?
btw, i spoke to soon earlier. as i said, i wanted:
the resulting stream of pairs to terminate only when both input streams terminate
so, i had to use zipWithPrevious
at which point it got pretty gnarly...
val nums = Stream.range[IO](1, 10).metered(25.millis)
val chars = Stream.range[IO]('a', 'z' + 1).map(_.toChar).metered(10.millis)
type PrevCurr[A] = (Option[A], Option[A])
def noneTermAndZipWithPrev[A]: Pipe[IO, A, PrevCurr[A]] = _
.noneTerminate
.zipWithPrevious
.map { case (prev, curr) => prev.flatten -> curr }
def combine[A, B]: (PrevCurr[A], PrevCurr[B]) => Option[(A, B)] = {
case ((_, Some(aCurr)), (_, Some(bCurr))) => (aCurr -> bCurr).some
case ((_, Some(aCurr)), (Some(bPrev), None)) => (aCurr -> bPrev).some
case ((Some(aPrev), None), (_, Some(bCurr))) => (aPrev -> bCurr).some
case ((_, None), (_, None)) => None
}
val zipped = for {
a <- nums.through(noneTermAndZipWithPrev).hold(none[Long] -> 1L.some)
b <- chars.through(noneTermAndZipWithPrev).hold(none[Char] -> '\0'.some)
c <- (a, b).mapN(combine).discrete.unNoneTerminate
} yield c
Stream
of pairs with?def unzip[F[_], A, B]: Pipe[F, (A, B), (Stream[F, A], Stream[F, B])] = ???
collect
collect
to implement your unzip