queue.dequeue.evalScan(0)(...)
, the initial state (0) is not emitted until the first item is available on the queue. However, when I use evalScan without a queue, or when I use a regular scan on a queue, the initial state is emitted immediately. Is this expected?
scan
one), but the inconsistency sounds annoying regardless
fs2.io.file.writeAll
supposed to emit a value?
def unzip[F[_]: ContextShift: Sync](blocker: Blocker, stream: Stream[F, Byte], chunksize: Int = chunkSize) = {
Stream.eval(blocker.delay(Files.createTempDirectory("unzippped"))).flatMap { tempDir =>
val path = tempDir.resolve("file.zip")
println(path)
stream
.through(fs2.io.file.writeAll(path, blocker))
.flatMap { _ =>
println(path)
Stream.evals(blocker.delay(javaUnzip(path, tempDir, chunksize)))
}
}
}
this always becomes and empty stream.
++
, not flatMap(_ =>
(think about a stream that emits more than one element)
Stream[F, Nothing]
would actually helped here
Hi everyone :),
I recently started using fs2 + zio, and there is some behavior of fs2 I have difficulty understanding.
Basically I have a stream (backed by a Queue
) which I am processing in parallel, using parEvalMap
.
During the processing of an element, some DbError
might occur which should:
This I thought to achieve using the following Pipe
:
val pipe: Pipe[Effect, Command, Result] = ZIO.runtime
.map { implicit r =>
def pipe(stream: Stream[Effect, Command]): Stream[Effect, Result] =
stream.parEvalMap(2)(processAndLog).handleErrorWith {
case error @ DbError(_) =>
retryProcessing(error) ++ pipe(stream)
}
pipe
}
stream.through(pipe)
with
def retryProcessing(error: DbError): Stream[Effect, Result] =
Stream.retry[Effect, CompilationResult](
processAndLog(Command(error.id)),
10.seconds,
identity,
Int.MaxValue
)
and
def processAndLog(command: Command): Task[Result]
I noticed however, that while the above code does indeed retry the first failing element until it succeeds, the 'remainder' of the stream is not processed any more.
I.e. when offering elements command1
, command2
, command3
which would all fail at first but succeed with retry later,
I notice that only command1
is processed, and the remainder of the commands is not.
I am convinced that the remainder of the stream should process as well, since when I replace parEvalMap
with evalMap
that is what actually happens (for this case I can see the corresponding log messages of the remaining commands being processed printed).
Basically, I would expect the same continuation behavior for parEvalMap
, as is the case for evalMap
.
@SystemFw thanks for the suggestion I'll try it!
Although this probably means I have some fundamentally flawed view of how streams 'work' :)
My initial thought was that while a a stream might fail due an error during processing (and further computation halts),
it would be possible to 'continue' the stream where it left off, using the .handleErrorWith
. Much alike we can continue computation using the .handleErrorWith
on for example the IO
monad.
But if I understand correctly, the failure of a stream should somehow be interpreted as a 'hard failure', in the sense that any continuation from it cannot happen?
IO
monad, you cannot "continue" the computation (it depends on how you interpret continue)
for {
a <- fa
b <- f(a)
c <- g(b)
} yield c
IO
, you cannot modify it (for example it comes from a function call)
f(a)
fails
g(b)
?
handleErrorWith(nextIO)
List[F[A]]
handleErrorWith
at the Stream
level, you can effectively restart