Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Feb 01 10:11
    @SystemFw banned @Hudsone_gitlab
  • Jan 31 2019 04:19
    404- forked
    404-/fs2
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
Erlend Hamnaberg
@hamnis
hey all.
isnt fs2.io.file.writeAll supposed to emit a value?
def unzip[F[_]: ContextShift: Sync](blocker: Blocker, stream: Stream[F, Byte], chunksize: Int = chunkSize) = {
    Stream.eval(blocker.delay(Files.createTempDirectory("unzippped"))).flatMap { tempDir =>
      val path = tempDir.resolve("file.zip")
      println(path)
      stream
       .through(fs2.io.file.writeAll(path, blocker))
        .flatMap { _ =>
          println(path)
          Stream.evals(blocker.delay(javaUnzip(path, tempDir, chunksize)))
        }
    }
  }
this always becomes and empty stream.
what am I doing wrong?
Fabio Labella
@SystemFw
it's working as intended, even though it might be confusing
that being said, "and then" in fs2 is expressed by ++, not flatMap(_ => (think about a stream that emits more than one element)
@hamnis
Erlend Hamnaberg
@hamnis
right. thanks
the Stream[F, Nothing] would actually helped here
Erlend Hamnaberg
@hamnis
yeah, I was trying different things here, so might do that as well
Erlend Hamnaberg
@hamnis
just to make sure I understand finalisers in streams, If they are compiled to an IO, they wont run until the IO completes? so if I do stream.compile.toList will that run the finalizers?
Fabio Labella
@SystemFw
I think the questions are independent. Nothing will run until you compile. On the other hand there can be finalisers whose lifetime isn't the lifetime of the entire stream
Erlend Hamnaberg
@hamnis
ok
Arjun Dhawan
@arjun-1

Hi everyone :),

I recently started using fs2 + zio, and there is some behavior of fs2 I have difficulty understanding.

Basically I have a stream (backed by a Queue) which I am processing in parallel, using parEvalMap.
During the processing of an element, some DbError might occur which should:

  • temporarily stop all processing,
  • retry the failing element until it succeeds.
  • after which the remaining stream is processed as usual

This I thought to achieve using the following Pipe:

val pipe: Pipe[Effect, Command, Result] = ZIO.runtime
  .map { implicit r =>


    def pipe(stream: Stream[Effect, Command]): Stream[Effect, Result] =
      stream.parEvalMap(2)(processAndLog).handleErrorWith {
        case error @ DbError(_) =>
          retryProcessing(error) ++ pipe(stream)
      }
    pipe
  }

stream.through(pipe)

with

def retryProcessing(error: DbError): Stream[Effect, Result] =
    Stream.retry[Effect, CompilationResult](
      processAndLog(Command(error.id)),
      10.seconds,
      identity,
      Int.MaxValue
    )

and

def processAndLog(command: Command): Task[Result]

I noticed however, that while the above code does indeed retry the first failing element until it succeeds, the 'remainder' of the stream is not processed any more.
I.e. when offering elements command1, command2, command3 which would all fail at first but succeed with retry later,
I notice that only command1 is processed, and the remainder of the commands is not.

I am convinced that the remainder of the stream should process as well, since when I replace parEvalMap with evalMap that is what actually happens (for this case I can see the corresponding log messages of the remaining commands being processed printed).
Basically, I would expect the same continuation behavior for parEvalMap, as is the case for evalMap.

pool
@hamstakilla
Hello, can i somehow clone stream so i can have multiple consumers see all the values emitted?
ybasket
@ybasket
@hamstakilla Look at broadcast (and its variants)
pool
@hamstakilla
Thanks
Fabio Labella
@SystemFw
@arjun-1 the stream stops with both parEvalMap and evalMap, which is the only possible behaviour since it's a monad. If you want to somehow skip, you need to handleErrorWith at the F level (so processAndLog)
I can give more info as to why that behaviour is the only possible one :)
Arjun Dhawan
@arjun-1

@SystemFw thanks for the suggestion I'll try it!

Although this probably means I have some fundamentally flawed view of how streams 'work' :)
My initial thought was that while a a stream might fail due an error during processing (and further computation halts),
it would be possible to 'continue' the stream where it left off, using the .handleErrorWith. Much alike we can continue computation using the .handleErrorWith on for example the IO monad.

But if I understand correctly, the failure of a stream should somehow be interpreted as a 'hard failure', in the sense that any continuation from it cannot happen?

Fabio Labella
@SystemFw
@arjun-1 so, this is a common question, let me expand
in the IO monad, you cannot "continue" the computation (it depends on how you interpret continue)
let's say you have
for {
 a <- fa
 b <- f(a)
 c <- g(b)
} yield c
and I give you this IO, you cannot modify it (for example it comes from a function call)
and imagine that f(a) fails
can you "continue" to g(b)?
Arjun Dhawan
@arjun-1
Hmm, indeed I cannot... I would only be able to see IO[Whatever type c is]...
Fabio Labella
@SystemFw
right, what you can do is attach another IO
handleErrorWith(nextIO)
and note that this is unavoidable, it's intrinsic in its monadic nature
Arjun Dhawan
@arjun-1
Ahhh, makes sense now!
Fabio Labella
@SystemFw
g depends on the result of f(a), so if f(a) fails there is no a to give to g
now, there is a reason why people get confused with Stream
I think it's because you intuitively think of Stream as a fancy List[F[A]]
for which it should be natural to "skip" a failed element and go the next (continue on error)
but Stream is not a List, it's a monad (so, a kind of tree), which means that subsequent parts of the stream might depend on previous ones, making "continuing" impossible
so you can do two things: if you do handleErrorWith at the Stream level, you can effectively restart
this works well with sources that have some notion of commit/resumption, such as kafka
if you can't do that, you need to go at the F level, and do something like attempt, giving you a Stream[F, Either[Throwable, A]] which you can then filter out
and obviously works for streams that do not exploit monadic structure, such as Stream.repeatEval(doThis)
and it's the conceptual equivalent of going "inside" that IOI showed you, and attempt f(a) individually
replace attempt with whatever error handling you want ofc
and that's what you need to do in your case as well, try to handle the error at the Task level
also note that one of your requirements requires some extra machinery, specifically "stop all processing on error"
Fabio Labella
@SystemFw
that works with evalMap, which is sequential, but with parEvalMap a computation might fail while others are in flight, and you need to decide what to do then
ybasket
@ybasket
You really have an awesome way of explaining @SystemFw – even though I know this piece I enjoy reading!
Arjun Dhawan
@arjun-1
Hmm, if I try to make it more concrete: handleErrorWith makes sense if you have the ability to 'restart' your stream, i.e. you can create a new stream from Stream.repeatEval(receive) (where receive gets a message from a broker)
If you don't have this ability, the only way of dealing with errors is to use attempt or deal with your error before you actually 'turn' it into a stream?