Most discussion is on Typelevel Discord: https://discord.gg/bSQBZA3Ced
If you don't have this ability, the only way of dealing with errors is to use attempt or deal with your error before you actually 'turn' it into a stream?
yeah, at the individual action level
IO
IO
, rather than after it
compile.lastOrError
(and retain evalMap/parEvalMap
), or simply turn those into flatMap
/map(..).parJoin
It works :). I worked 2 solutions:
def retryingProcessing(command: Command): Stream[Effect, Result] = Stream.retry(...)
def retryingProcessing2(command: Command): Effect[Result] = processAndLog(commmand).retry(...)
val pipe: Pipe[Task, Command, Result] = ZIO.runtime
.map { implicit r =>
// _.map(retryingProcessing).parJoin(3)
_.parEvalMap(3)(retryingProcessing2)
}
but went with the retry
on Task
. And the result is even more concise =). Many thanks :pray:
Hi, with FS2 I'd like to group only some adjacent entries that meet certain shared criteria. The stream is infinite so a plain fold won't do the job. Assume stream data already parsed into types A, B, C sharing a common trait, and lower case letters are attribute names:
A(...)
B(a=1, b=2)
B(a=1, b=3)
B(a=1, b=4)
B(a=2, b=1)
C(...)
A(...)
...
I want to group all adjacent Bs together that have the same value of a:
A(...)
BGRP(a=1, bs=List(2, 3, 4))
BGRP(a=2, bs=List(1))
C(...)
A(...)
...
Since a B might have additional attributes, the grouped version BGRP(List(B(a=1, b=2), B(a=1, b=3), B(a=1, b=4)) is acceptable.
groupAdjacentBy
does most of the work, though you need a way to tunnel the A
and C
instances through. Here’s one solution which uses the hashCode of A
and C
, though hopefully you have something that’s more unique: https://scastie.scala-lang.org/zBX5mXbSRd6LYitbI9TtDA
// fs2.Stream takes 2 type parameters, expected: 1
FlatMap[fs2.Stream].flatten(???)
// fs2.Stream[fs2.Pure,Int] takes no type parameters, expected: 1
FlatMap[fs2.Stream[Pure, Int]].flatten(???)
* -> *
, so things of shape F[_]
*
one time, and of kind (* -> *) -> * -> *
the other time
* -> *
FlatMap[Stream[Pure, *]]
, where *
comes from kind projector
FlatMap[Stream[Pure, ?]]
in older versions of kind projector
*
(so shape A
), with an existential parameter (a wildcard type _
), which is a massive inconsistency syntax-wise, since at the value level _
does indeed mean "partially apply"
*
(pronounced star
or type
) is the kind of types that require no extra info to be instantiated, so A
, Int
, Stream[IO, Int]
, Stream[F, Int]
etc
F[_]
requires a type of kind *
(an A
), and produces another type of kind *
(F[A]
)
F[_]
has kind * -> *
Either
requires two types of kind star and produces types of kind star, so the kind of Either is * -> * -> *
(it's curried)
* -> * -> *
Stream[Int, Int]
, which is incorrect
* -> *
, and the second kind *