metered
is not working for you https://github.com/SystemFw/upperbound
Stream.fixedRate
or Stream.fixedDelay
parJoin
states "maxOpen - Maximum number of open inner streams at any time". Does that really mean what it reads and not the number of streams that can be processed in parallel? If it's the former, then I guess I should typically just set it to the total number of inner streams. Just out of curiosity, what happens if maxOpen is smaller than the number of inner streams?
Hi guys. I has been working on some quite sophisticated nested streams processing and found out that one of the way to approach the problem could be an evalFold
method in Stream
(which would look like the regular fold
but would take (O2, O) => F[O2]
function). But there's no such method in Stream
although there're many other eval*
counterparts for effectless methods.
I wonder, is it due to some conceptual concerns regarding such method or does evalFold
just still await its hero to get implemented?
To me it should be as easy as the following:
def evalFold[F2[x] >: F[x], O2](z: O2)(f: (O2, O) => F2[O2]): Stream[F2, O2] = {
def go(in: Stream[F2, O])(zz: O2): Pull[F2, INothing, O2] = {
in.pull.uncons1.flatMap {
case None => Pull.pure(zz)
case Some((hd, tl)) =>
Pull.eval(f(zz, hd)).flatMap { go(tl) }
}
}
go(this)(z).flatMap(Pull.output1).stream
}
I borrowed it from implementations of evalScan
and regular fold
, actually.
eval
versions of everything?
.drain ++ Stream(())
, or void.foldMonoid
for
in general
++
really
Resource
/F
anyway
Hi, I am combining finch with fs2 and I have a Stream[IO, Array[Byte]]
which I am trying to write using writeAll
. But The compiler complains about diverging implicit expansion for type cats.effect.Sync[F] starting with method catsReaderWriteStateTSync in object Sync
on the writeAll
call.
Am I missing an import or something ?
import fs2.Stream
import fs2.io
import java.nio.file._
import cats.effect.{ContextShift, IO}
import concurrent.ExecutionContext.Implicits.global
import cats.effect.Blocker
import cats.effect.IO
object Utils {
def writeToFile(p: Path, stream: Stream[IO, Array[Byte]]) = {
implicit val contextShift: ContextShift[IO] = IO.contextShift(global)
Stream.resource(Blocker[IO]).flatMap { blocker =>
stream
.through(
io.file.writeAll(p, blocker, Seq(StandardOpenOption.CREATE, StandardOpenOption.WRITE))
)
.compile
.drain
}
}
}
Pull
. I see it used with uncons.flatMap
, but I’m unsure of how its mechanics work in other cases. In particular, I have a List[Pull[F,Byte,Unit]]
, and calling .sequence
on it yields Pull[F,Byte,List[Unit]]
. Is that outputting all individual pulls in order? Can I turn it into a Pull[F, Byte, Unit]
?