Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Feb 01 10:11
    @SystemFw banned @Hudsone_gitlab
  • Jan 31 2019 04:19
    404- forked
    404-/fs2
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
Fabio Labella
@SystemFw
for which it should be natural to "skip" a failed element and go the next (continue on error)
but Stream is not a List, it's a monad (so, a kind of tree), which means that subsequent parts of the stream might depend on previous ones, making "continuing" impossible
so you can do two things: if you do handleErrorWith at the Stream level, you can effectively restart
this works well with sources that have some notion of commit/resumption, such as kafka
if you can't do that, you need to go at the F level, and do something like attempt, giving you a Stream[F, Either[Throwable, A]] which you can then filter out
and obviously works for streams that do not exploit monadic structure, such as Stream.repeatEval(doThis)
and it's the conceptual equivalent of going "inside" that IOI showed you, and attempt f(a) individually
replace attempt with whatever error handling you want ofc
and that's what you need to do in your case as well, try to handle the error at the Task level
also note that one of your requirements requires some extra machinery, specifically "stop all processing on error"
that works with evalMap, which is sequential, but with parEvalMap a computation might fail while others are in flight, and you need to decide what to do then
ybasket
@ybasket
You really have an awesome way of explaining @SystemFw – even though I know this piece I enjoy reading!
Arjun Dhawan
@arjun-1
Hmm, if I try to make it more concrete: handleErrorWith makes sense if you have the ability to 'restart' your stream, i.e. you can create a new stream from Stream.repeatEval(receive) (where receive gets a message from a broker)
If you don't have this ability, the only way of dealing with errors is to use attempt or deal with your error before you actually 'turn' it into a stream?
ybasket
@ybasket
I guess the "stop all processing on error” requirement could be implemented (+- in-flight elements) via a Signal (triggered by the retrying F) and pauseWhen just before parEvalMap
Fabio Labella
@SystemFw

even though I know this piece I enjoy reading!

Thanks :)

If you don't have this ability, the only way of dealing with errors is to use attempt or deal with your error before you actually 'turn' it into a stream?

yeah, at the individual action level

note that it's really the same with IO
@arjun-1 in your specific case, just call the retry action on the IO, rather than after it
then you can either compile.lastOrError (and retain evalMap/parEvalMap), or simply turn those into flatMap/map(..).parJoin
Arjun Dhawan
@arjun-1
Thanks a lot @SystemFw ! I'm gonna implement your suggestion :)
Fabio Labella
@SystemFw
:+1:
Arjun Dhawan
@arjun-1

It works :). I worked 2 solutions:

def retryingProcessing(command: Command): Stream[Effect, Result] = Stream.retry(...)
def retryingProcessing2(command: Command): Effect[Result] = processAndLog(commmand).retry(...)

val pipe: Pipe[Task, Command, Result] = ZIO.runtime
      .map { implicit r =>
        // _.map(retryingProcessing).parJoin(3)
        _.parEvalMap(3)(retryingProcessing2)
      }

but went with the retry on Task. And the result is even more concise =). Many thanks :pray:

Fabio Labella
@SystemFw
:+1:
David Flynn
@WickedUk_gitlab

Hi, with FS2 I'd like to group only some adjacent entries that meet certain shared criteria. The stream is infinite so a plain fold won't do the job. Assume stream data already parsed into types A, B, C sharing a common trait, and lower case letters are attribute names:

A(...)
B(a=1, b=2)
B(a=1, b=3)
B(a=1, b=4)
B(a=2, b=1)
C(...)
A(...)
...

I want to group all adjacent Bs together that have the same value of a:

A(...)
BGRP(a=1, bs=List(2, 3, 4))
BGRP(a=2, bs=List(1))
C(...)
A(...)
...

Since a B might have additional attributes, the grouped version BGRP(List(B(a=1, b=2), B(a=1, b=3), B(a=1, b=4)) is acceptable.

Michael Pilquist
@mpilquist
@WickedUk_gitlab groupAdjacentBy does most of the work, though you need a way to tunnel the A and C instances through. Here’s one solution which uses the hashCode of A and C, though hopefully you have something that’s more unique: https://scastie.scala-lang.org/zBX5mXbSRd6LYitbI9TtDA
Actually, this is cleaner and bypases the issue with needing a unique value for A and C: https://scastie.scala-lang.org/jERxKCGETEWWJIbJyLJJ8w
Matt Hughes
@matthughes
Not really an fs2 question but I’ve stumbled upon this compiler error a number of times and it leaves me baffled:
    // fs2.Stream takes 2 type parameters, expected: 1
    FlatMap[fs2.Stream].flatten(???)
    // fs2.Stream[fs2.Pure,Int] takes no type parameters, expected: 1
    FlatMap[fs2.Stream[Pure, Int]].flatten(???)
Fabio Labella
@SystemFw
@matthughes you have kind errors in both places
FlatMap wants a * -> * , so things of shape F[_]
Matt Hughes
@matthughes
Yes but am I the only one that finds that error message circular?
Fabio Labella
@SystemFw
it makes sense to me actually
FlatMap expects things that take 1 type parameter
in one case you give it something that takes 0, in the other one that takes 2
so you fail the target of 1 both times
more precisely, you give it something of kind * one time, and of kind (* -> *) -> * -> * the other time
you need * -> *
David Flynn
@WickedUk_gitlab
@mpilquist That looks like what I'm after, thanks. I'll try to implement it.
Fabio Labella
@SystemFw
or in the ugly scala syntax, you need F[_], and you give it A one time, and G[_[_], _] the other time
Matt Hughes
@matthughes
So … FlatMap[fs2.Stream[Pure, _]].flatten(???)
Fabio Labella
@SystemFw
does that make any sense? I agree that kind errors in scala aren't the clearest, but the two above are coherent (if a bit cryptic at first)
well, that's conceptually right but that syntax unfortunately means something else in scala
Dotty fixes it, and what you have is how you'd write it in dotty
in scala 2 you need FlatMap[Stream[Pure, *]], where * comes from kind projector
you also see it as FlatMap[Stream[Pure, ?]] in older versions of kind projector
your intuition is correct in that you need to partially apply that type, but in scala 2 an underscore in that position means a type of kind * (so shape A), with an existential parameter (a wildcard type _), which is a massive inconsistency syntax-wise, since at the value level _ does indeed mean "partially apply"
but as I said, that's fixed in dotty
Lucas Satabin
@satabin
Alternatively, you can name the type as in type PureStream[T] = Stream[Pure, T] and then use FlatMap[PureStream] if kind-projector is not an option for you
Matt Hughes
@matthughes
Right. Error message throws me off because it doesn’t mention FlatMap and the “expected” seems to refer back to Stream. Should be: "fs2.Stream[fs2.Pure,Int] takes no type parameters, FlatMap expects: 1"
Fabio Labella
@SystemFw
heh, FlatMap does not expect 1
it expects something that expects 1