Most discussion is on Typelevel Discord: https://discord.gg/bSQBZA3Ced
is there already a utility for batching based on arbitrary sizing of each element of the stream?
e.g., something that has the shape of:
def batcher[F[_], E](threshold: Int, sizeOf: E => Int): Pipe[F, E, NonEmptyVector[E]] = ???
which can then be used like this:
case class Group(id: Long, xs: NonEmptyVector[Symbol])
val groups: Stream[Pure, Group] = ???
val batched: Stream[Pure, NonEmptyVector[Group]] = groups.through(batcher(100, _.xs.length))
https://github.com/vectos/formulation/tree/rpc/rpc/src/main/scala/formulation/rpc
FS2 code is mainly in AvroServer/AvroClient/Example
s.concurrently(SoS.join)
is not onerous syntactically
SoS
that you then throw away
join
without allocating all the queues