Most discussion is on Typelevel Discord: https://discord.gg/bSQBZA3Ced
wcApp < testInputStream >? target
and I'm suprised that it returns a Pipe since I have provided my concrete implementation in terms of the test input being passed in. if a Pipe is a stream => stream then I assume I could do stream.flatMap(pipe) to make to transition my data from A -> B?
@sergeda it somewhat depends on if these have any dependencies between them. but basically, you could make everything an F
(you'd compile.something
the Stream
) or make everything a Stream
(using Stream.eval
to lift the F
into Stream
).
once they are the same monad, you can compose them with a for
, flatMap
, mapN
, parMapN
, etc.
Hello guys, I have a question about the parallel processing of streams with a discriminator. Here's an example:
override def run(args: List[String]): IO[ExitCode] =
Random.scalaUtilRandom[IO].flatMap { implicit random =>
val flat = Stream(
("a", 1),
("a", 2),
("a", 3),
("b", 1),
("b", 2),
("b", 3),
("c", 1),
("c", 2),
("c", 3)
).covary[IO]
val a = flat.filter { case (k, _) => k === "a" }.through(rndDelay)
val b = flat.filter { case (k, _) => k === "b" }.through(rndDelay)
val c = flat.filter { case (k, _) => k === "c" }.through(rndDelay)
val nested = Stream(a, b, c)
nested.parJoin(100).printlns.compile.drain.as(ExitCode.Success)
}
def rndDelay[F[_]: Monad: Random: Temporal, A]: Pipe[F, A, A] =
in =>
in.evalMap { v =>
(Random[F].nextDouble.map(_.seconds) >>= Temporal[F].sleep) >> Applicative[F].pure(v)
}
My input stream looks a lot like flat
stream, basically items with some discriminator. I would like to eval items with the same discriminator sequentially, but with a different discriminator in parallel. In the example above, there are only 3 different values for the discriminator "a"
, "b"
and "c"
, however in my real-world use case, there's an unknown number of these values. How can I do this in fs2?
gzip
or bzip2
? For gzip
, it feels a bit awkward, but I was able to do fs2.compression.gunzip().map(_.flatMap(_.content))
, however for bzip2, I can't find any support in fs2 for it. The best I could find is apache that has an InputStream that can wrap the file input stream, then in fs2, I would have to convert the fs2 Stream to an InputStream (I haven't seen any built it method for that), wrap that with the bzip2 InputStream, and then read that InputStream back to an fs2 Stream. Is there something better I can do?
toInputStream
can be used to get the InputStream out of the fs2 stream. The main concern was about bzip2 though, is there a better way than to convert back and forth with an InputStream to apply the bzip2 InputStream decorator from apache commons?
hey :wave: I have a stream that emits some events
I'd like to merge it with a second stream that will emit only when the first will not emit for some time
i.e.
sealed trait Foo
case object Bar extends Foo
case object Baz extends Foo
val timed = Stream.awakeEvery(10.seconds).map(_ => Baz)
val events$: Pipe[IO, Foo, Foo] = in$ => ???
I'd like to pass-through in$
and if there is no emit for i.e. 20 seconds, switch to timed
untill next event emits on in$
(ignore timed
otherwise)
.mp4
in this case) and streams the data the file contains (some T
). Actually trying to write the code quickly made me realise Stream[F, Byte]
is not my friend here (emphasis on the Stream
part). I guess the reason has to do with the fact that I'm doing a stateful process on a stream.
produce(..).flatten
on the producer
Queue
or Channel
, but I would assume you only want to return an http response when the write to Kafka completes
Hi everybody, I have a question that I posted in doobie's channel but I think it is worth posting it here.
I have code related to streaming content from the db:
def f(a: fs2.Stream[cats.effect.IO, String], b: fs2.Stream[cats.effect.IO, String]): Unit =
(a ++ b).map(println).compile.drain.unsafeRunSync()
Where a
comes from doobie, it is streaming data from the database, b
is an infinite stream generated from another source, the problem is that the database connection is not released until the infinite stream gets closed, producing a connection leak.
On the other hand, if I add a dummy finalizer to a
, the db connection gets released just after consuming the last item from a
:
def f(a: fs2.Stream[cats.effect.IO, String], b: fs2.Stream[cats.effect.IO, String]): Unit =
(a.onFinalize(cats.effect.IO.unit) ++ b).map(println).compile.drain.unsafeRunSync()
There is likely something going on that I don't understand but I couldn't find it documented.
Any insights?
hi, we're seeing a change in behaviour of groupWithin
after the timeout is exeeded between fs2 2.5.10 and fs2 3 + above
import cats.effect._
import fs2._
import scala.concurrent.duration.DurationInt
object FS2Playground extends IOApp.Simple {
override def run: IO[Unit] = {
val s =
Stream.sleep[IO](3.second).flatMap(_ => Stream.emits(Seq(1, 2))).groupWithin(10, 2.seconds).map(c => println(c.toList))
for {
_ <- s.compile.drain
} yield ()
}
}
fs2 2.5.10 output:
List(1, 2)
fs2 3.2.4 output:
List(1)
List(2)
i.e. notice the sleep
time is > the groupWithin
timeout - in later versions it immediately emits an element in this case, in 2.5.10 the timeout "resets"
is this expected? and is there a way to recreate 2.5.10 behaviour in fs2 3?
n
events from a Stream
but would like to not wait forever for upstream to fill the buffer completely. I'd rather have some timeout that would then pass a non-full buffer. Is there already something for this, or how would you go about it? Thanks!
Hello everyone 👋, I'm trying to implement some sort of TCP connection class based on fs2-io:
It should have such properties:
trait UserConnection[F[_], Enc[_], Dec[_]]:
def nextMessage[T: Dec]: F[T]
def sendMessage[T: Enc](msg: T): F[Unit]
def closeConnection: F[Unit]
And the current implementations is this:
class TcpUserConnection[F[_]: Concurrent](socket: Socket[F])
extends UserConnection[F, Encoder, Decoder]:
def nextMessage[T: Decoder]: F[T] =
socket.reads
.through(text.utf8.decode)
.through(text.lines)
.map(decode[T])
.head
.compile
.last
.flatMap {
case Some(Right(value)) => value.pure[F]
case Some(Left(error)) => error.raiseError[F, T]
case None => NothingReceivedException.raiseError[F, T]
}
def sendMessage[T: Encoder](msg: T): F[Unit] =
Stream(msg)
.map(_.asJson.noSpaces)
.interleave(Stream.constant("\n"))
.through(text.utf8.encode)
.through(socket.writes)
.compile
.drain
def closeConnection: F[Unit] = socket.endOfInput >> socket.endOfOutput
The problem I'm experiencing here, is that I cannot read the message which was sent, I'm getting ClosedChannelException
.
I have this test snippet here, which reproduces the error:
import cats.effect.IOApp
import cats.syntax.all.*
import fs2.io.net.Network
import cats.effect.IO
import com.comcast.ip4s.{Host, Port}
import com.comcast.ip4s.SocketAddress
import scala.concurrent.duration.*
object Test extends IOApp.Simple:
val server = Network[IO]
.server(Host fromString "localhost", Port fromString "12345")
.map(socket => TcpUserConnection(socket))
.parEvalMapUnordered(100)(c =>
IO(println("Waiting for message on server")) >>
c.nextMessage[AuthForm].map(form => println(form)) >>
IO(println("Recevied message on server"))
)
.compile
.drain >>
IO(println("Finished server !!!"))
val client =
Network[IO]
.client(SocketAddress(Host.fromString("localhost").get, Port.fromString("12345").get))
.map(TcpUserConnection(_))
.use { c =>
IO.sleep(5.seconds) >>
IO(println("Sending message from client")) >>
c.sendMessage[AuthForm](AuthForm(Some("user"), Some("pass"))) >>
IO(println("Sent from client"))
} >> IO(println("Finished client!!!"))
val run: IO[Unit] =
for
serverFiber <- server.start.onError(exp => IO(println(s"Server exp: $exp")))
_ = println("Started server")
_ <- IO.sleep(2.seconds)
clientFiber <- client.start.onError(exp => IO(println(s"Client exp: $exp")))
_ = println("Started client")
fs <- serverFiber.join
fc <- clientFiber.join
res = (fs, fc)
_ = println(s"Result: $fs | $fc")
yield ()
The run result:
Started server
Started client
Waiting for message on server
Sending message from client
Sent from client
Finished client!!!
Result: Errored(java.nio.channels.ClosedChannelException) | Succeeded(IO(()))
What am i doing wrong?
Is it even possible to use fs2-io with such semantic of "sendMessage <-> nextMessage", or is it closing the TCP channel after completion of .reads
or .writes
?
def build(killSwitch: SignallingRef[IO, Boolean], q: Queue[IO, String]): Stream[IO, Unit] = {
val dbApp =
Stream
.awakeEvery[IO](Random.nextInt(10).seconds)
.evalMap(_ => Console[IO].println("GET FROM DB").as(UUID.randomUUID().toString))
.evalMap(q.tryOffer(_).flatTap(killSwitch.set))
.evalTap(_ => killSwitch.get.flatMap(v => Console[IO].println(s"DONE WITH $v")))
val flinkApp =
Stream
.eval(Console[IO].println("APP START") >> IO.sleep(100.seconds) >> Console[IO].println("APP DONE"))
Stream(dbApp, flinkApp).parJoinUnbounded.drain
}
val program = for {
killSwitch <- Stream.eval(SignallingRef[IO, Boolean](false))
queue <- Stream.eval(Queue.unbounded[IO, String])
_ <- Stream.eval(Console[IO].println("BEFORE PROGRAM"))
result <- Stream.eval(queue.take) concurrently
build(killSwitch, queue)
.flatMap(_ => Stream.eval(Console[IO].println("AFTER PROGRAM")))
.interruptWhen(killSwitch)
.drain
} yield result
val result = program.compile.lastOrError.unsafeRunSync()
println(result)
}
flinkAppStream = Stream.eval(IO(FlinkAppEntry.main(Array.empty)))