Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
  • Feb 01 2021 10:11
    @SystemFw banned @Hudsone_gitlab
  • Jan 31 2019 04:19
    404- forked
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
Phillip Taylor
So how to turn my target which is a Pipe[IO, Byte, String] into a fs2.Stream[IO, Byte] ?
Adam Rosien
Pipe[F, A, B] is just a function Stream[F, A] => Stream[F, B], so it could only produce a Stream[IO, String]
Phillip Taylor
I've used wcApp < testInputStream >? target and I'm suprised that it returns a Pipe since I have provided my concrete implementation in terms of the test input being passed in. if a Pipe is a stream => stream then I assume I could do stream.flatMap(pipe) to make to transition my data from A -> B?
I know that you're supposed to use stream.through(pipe) but I'm not sure why I get a pipe in the first place
I'm watching all the videos of the fs2 documentation page now, hoping one will help me
Phillip Taylor
looks like ioApp.run().map { _.output.toList } gives me something I can build a stream from
Let me see if I can fall ass-backwards through the rest of this app lol
Adam Rosien
i don't know the api very well, but the prox stuff builds something that specifies the "wiring" of various processes, which you then run to get an effect. looking at the prox fs2 docs, run produces an F, which in this case is IO. you want a stream? what are you trying to do?
Serhii Dashko
I have some methods that returns just F[Result] and some that returns F[Either[Error, Result]] and BlazeServerBuilder that returns Stream[F, ExitCode]. What is the correct way to deal with such situation? How can I combine everything in one for comprehension?
Adam Rosien

@sergeda it somewhat depends on if these have any dependencies between them. but basically, you could make everything an F (you'd compile.something the Stream) or make everything a Stream (using Stream.eval to lift the F into Stream).

once they are the same monad, you can compose them with a for, flatMap, mapN, parMapN, etc.

Serhii Dashko
@arosien Thank you
Hi, anybody know something articles about motivation of using streams, in particular fs2? Thanks
Adam Rosien
@fbariy the answer probably depends on what paradigms and libraries you already use. happy to go from there.
Phillip Taylor
I see it's been a quiet week since I was last here asking for help. :-) I'm hoping someone can tell me why this doesn't work: https://pastebin.com/J1c9NdKW
It's a class called MetricStream with two functions, effectively enqueue() and summarise(). It hangs during the summarise call when I don't think it should
Shall I reask my question on the discord server instead?
Phillip Taylor
I have asked my question over there
Tomasz Bekas

Hello guys, I have a question about the parallel processing of streams with a discriminator. Here's an example:

override def run(args: List[String]): IO[ExitCode] =
  Random.scalaUtilRandom[IO].flatMap { implicit random =>
    val flat = Stream(
      ("a", 1),
      ("a", 2),
      ("a", 3),

      ("b", 1),
      ("b", 2),
      ("b", 3),

      ("c", 1),
      ("c", 2),
      ("c", 3)

    val a = flat.filter { case (k, _) => k === "a" }.through(rndDelay)
    val b = flat.filter { case (k, _) => k === "b" }.through(rndDelay)
    val c = flat.filter { case (k, _) => k === "c" }.through(rndDelay)

    val nested = Stream(a, b, c)


def rndDelay[F[_]: Monad: Random: Temporal, A]: Pipe[F, A, A] =
  in =>
    in.evalMap { v =>
      (Random[F].nextDouble.map(_.seconds) >>= Temporal[F].sleep) >> Applicative[F].pure(v)

Gist link

My input stream looks a lot like flat stream, basically items with some discriminator. I would like to eval items with the same discriminator sequentially, but with a different discriminator in parallel. In the example above, there are only 3 different values for the discriminator "a", "b" and "c", however in my real-world use case, there's an unknown number of these values. How can I do this in fs2?

Guillaume Poirier
What's the best way to decompress an fs2 byte stream for a file that was compressed with gzip or bzip2? For gzip, it feels a bit awkward, but I was able to do fs2.compression.gunzip().map(_.flatMap(_.content)), however for bzip2, I can't find any support in fs2 for it. The best I could find is apache that has an InputStream that can wrap the file input stream, then in fs2, I would have to convert the fs2 Stream to an InputStream (I haven't seen any built it method for that), wrap that with the bzip2 InputStream, and then read that InputStream back to an fs2 Stream. Is there something better I can do?
Guillaume Poirier
Right, I missed that the toInputStream can be used to get the InputStream out of the fs2 stream. The main concern was about bzip2 though, is there a better way than to convert back and forth with an InputStream to apply the bzip2 InputStream decorator from apache commons?
Hi how can I interrupt a stream at a certain time? Is it safe enough to use interruptAfter(finiteDuration) and just use the calculated duration? It will never interrupt a current effect though right? even if it is just from a Stream.awakeDelay?
Matt Hicks
Is fs2.Stream a good replacement for Iterator if I want to do batches like inserting into a database?
1 reply
@darkfrog26 i'd put it the other way round - Iterator is a really really bad replacement for fs.Stream for this :)
Marcin Wadoń

hey :wave: I have a stream that emits some events
I'd like to merge it with a second stream that will emit only when the first will not emit for some time


sealed trait Foo
case object Bar extends Foo
case object Baz extends Foo

val timed = Stream.awakeEvery(10.seconds).map(_ => Baz)

val events$: Pipe[IO, Foo, Foo] = in$ => ???

I'd like to pass-through in$ and if there is no emit for i.e. 20 seconds, switch to timed untill next event emits on in$ (ignore timed otherwise)

Ashkan Kh. Nazary
Hello everyone :wave: can someone point to a tutorial/talk/blog that actually explains the Pull thing ? I've read a few things and got some bare ideas + the docs on the code which confused me even more :D tnx.
Ashkan Kh. Nazary
To give it some context: I am trying to write a pure decoder that reads the bytes from a certain file format (.mp4 in this case) and streams the data the file contains (some T). Actually trying to write the code quickly made me realise Stream[F, Byte] is not my friend here (emphasis on the Stream part). I guess the reason has to do with the fact that I'm doing a stateful process on a stream.
Adam Rosien
1 reply
Renārs Kudiņš
Hello! I see that new and old implementation of fs2.concurrent.Topic differ in following way:
  • in fs2 v2 subscribing to Topic will lead to creation of stream that begins with last published message
  • in fs2 v3 subscribing to Topic will lead to creation of stream that begins with next published message
    Is this new behavior change intentional? Sample repo comparing these two behaviors https://github.com/retriku/fs2-topics
Fabio Labella
it's intentional, Topic no longer requires an initial element either and it's closer to a broadcast queue. This was due to popular demand over the years. Btw we have largely moved to discord (link in channel description), you will get prompter answers there
Hunor Kovács
Hello, how do I define a stream that has its input based on an external signal, such as an http request coming into my server. I can't figure out how to generate a stream from user input like that.
I don't want my entire http request to be modeled as a stream and turn into a response. What I would like to do is, besides responding in my handler to the request, initiate a write to kafka - using fs2-kafka, which usually happens by producing into a stream.
Fabio Labella
Most discussion is on Discord these days, you will get a quicker answer if you ask there, we don't monitor Gitter closely anymore
anyway, you don't need a Stream to write with fs2-kafka, just call produce(..).flatten on the producer
if you do want to write to kafka asyncly, you can use a Queue or Channel, but I would assume you only want to return an http response when the write to Kafka completes
Hunor Kovács
Thank you! I chose a Queue solution.
Alexis Hernandez

Hi everybody, I have a question that I posted in doobie's channel but I think it is worth posting it here.

I have code related to streaming content from the db:

      def f(a: fs2.Stream[cats.effect.IO, String], b: fs2.Stream[cats.effect.IO, String]): Unit =
        (a ++ b).map(println).compile.drain.unsafeRunSync()

Where a comes from doobie, it is streaming data from the database, b is an infinite stream generated from another source, the problem is that the database connection is not released until the infinite stream gets closed, producing a connection leak.

On the other hand, if I add a dummy finalizer to a, the db connection gets released just after consuming the last item from a:

    def f(a: fs2.Stream[cats.effect.IO, String], b: fs2.Stream[cats.effect.IO, String]): Unit =
        (a.onFinalize(cats.effect.IO.unit) ++ b).map(println).compile.drain.unsafeRunSync()

There is likely something going on that I don't understand but I couldn't find it documented.

Any insights?

Marcin Wadoń
Hey, do I understand correctly that you want to terminate both streams together? (terminate B when A terminates)?
You can use mergeHaltLeft operator that will merge both streams and terminate when left stream terminates
Alexis Hernandez
I want to terminate a when a is consumed and keep b there indefinitely, first example terminates a when b finishes
Marcin Wadoń
btw if you want quicker response, jump to discord server
Martin Carolan

hi, we're seeing a change in behaviour of groupWithin after the timeout is exeeded between fs2 2.5.10 and fs2 3 + above

import cats.effect._
import fs2._

import scala.concurrent.duration.DurationInt

object FS2Playground extends IOApp.Simple {
  override def run: IO[Unit] = {
    val s =
      Stream.sleep[IO](3.second).flatMap(_ => Stream.emits(Seq(1, 2))).groupWithin(10, 2.seconds).map(c => println(c.toList))
    for {
      _ <- s.compile.drain
    } yield ()

fs2 2.5.10 output:

List(1, 2)

fs2 3.2.4 output:


i.e. notice the sleep time is > the groupWithin timeout - in later versions it immediately emits an element in this case, in 2.5.10 the timeout "resets"

is this expected? and is there a way to recreate 2.5.10 behaviour in fs2 3?

Hi all, I would like to buffer n events from a Stream but would like to not wait forever for upstream to fill the buffer completely. I'd rather have some timeout that would then pass a non-full buffer. Is there already something for this, or how would you go about it? Thanks!
@eikek:matrix.org interestingly, i think you're looking for groupWithin which is discussed in the previous message.
1 reply
Sergey Fedorov

Hello everyone 👋, I'm trying to implement some sort of TCP connection class based on fs2-io:

It should have such properties:

trait UserConnection[F[_], Enc[_], Dec[_]]:
  def nextMessage[T: Dec]: F[T]
  def sendMessage[T: Enc](msg: T): F[Unit]
  def closeConnection: F[Unit]

And the current implementations is this:

class TcpUserConnection[F[_]: Concurrent](socket: Socket[F])
  extends UserConnection[F, Encoder, Decoder]:

  def nextMessage[T: Decoder]: F[T] =
      .flatMap {
        case Some(Right(value)) => value.pure[F]
        case Some(Left(error))  => error.raiseError[F, T]
        case None               => NothingReceivedException.raiseError[F, T]

  def sendMessage[T: Encoder](msg: T): F[Unit] =

  def closeConnection: F[Unit] = socket.endOfInput >> socket.endOfOutput

The problem I'm experiencing here, is that I cannot read the message which was sent, I'm getting ClosedChannelException.
I have this test snippet here, which reproduces the error:

import cats.effect.IOApp
import cats.syntax.all.*
import fs2.io.net.Network
import cats.effect.IO
import com.comcast.ip4s.{Host, Port}
import com.comcast.ip4s.SocketAddress

import scala.concurrent.duration.*

object Test extends IOApp.Simple:
  val server = Network[IO]
    .server(Host fromString "localhost", Port fromString "12345")
    .map(socket => TcpUserConnection(socket))
    .parEvalMapUnordered(100)(c =>
      IO(println("Waiting for message on server")) >>
        c.nextMessage[AuthForm].map(form => println(form)) >>
        IO(println("Recevied message on server"))
    .drain >>
    IO(println("Finished server !!!"))

  val client =
      .client(SocketAddress(Host.fromString("localhost").get, Port.fromString("12345").get))
      .use { c =>
        IO.sleep(5.seconds) >>
          IO(println("Sending message from client")) >>
          c.sendMessage[AuthForm](AuthForm(Some("user"), Some("pass"))) >>
          IO(println("Sent from client"))
      } >> IO(println("Finished client!!!"))

  val run: IO[Unit] =
      serverFiber <- server.start.onError(exp => IO(println(s"Server exp: $exp")))
      _            = println("Started server")
      _           <- IO.sleep(2.seconds)
      clientFiber <- client.start.onError(exp => IO(println(s"Client exp: $exp")))
      _            = println("Started client")
      fs          <- serverFiber.join
      fc          <- clientFiber.join
      res          = (fs, fc)
      _            = println(s"Result: $fs | $fc")
    yield ()

The run result:

Started server
Started client
Waiting for message on server
Sending message from client
Sent from client
Finished client!!!
Result: Errored(java.nio.channels.ClosedChannelException) | Succeeded(IO(()))

What am i doing wrong?
Is it even possible to use fs2-io with such semantic of "sendMessage <-> nextMessage", or is it closing the TCP channel after completion of .reads or .writes?

Hi guys, I was wondering if FS2 does some optimizations when compiling Streams ? For instance, if I chain several .filter(), will it be optimized into a single one ?
Hello everyone I am very new to FS2 and looking for resources on learning how to write test for streams, can anyone help with something that can ease me into it ..
Thank you!!
Hey guys, how can I get a Stream[IO, Byte] from an existing Array[Byte]? (I'm just trying to pass in a file that's returned from sttp to fs2-ftp to upload to an sftp). I'm a complete beginner to fs2 and was trying to quickly glue together an sftp upload, sorry if the question is stupid
3 replies
Mikhail Nemenko
hey guys have some problems with stopping a stream
  def build(killSwitch: SignallingRef[IO, Boolean], q: Queue[IO, String]): Stream[IO, Unit] = {
    val dbApp    =
        .evalMap(_ => Console[IO].println("GET FROM DB").as(UUID.randomUUID().toString))
        .evalTap(_ => killSwitch.get.flatMap(v => Console[IO].println(s"DONE WITH $v")))

    val flinkApp =
        .eval(Console[IO].println("APP START") >> IO.sleep(100.seconds) >> Console[IO].println("APP DONE"))

    Stream(dbApp, flinkApp).parJoinUnbounded.drain

  val program = for {
    killSwitch <- Stream.eval(SignallingRef[IO, Boolean](false))
    queue      <- Stream.eval(Queue.unbounded[IO, String])
    _          <- Stream.eval(Console[IO].println("BEFORE PROGRAM"))
    result     <- Stream.eval(queue.take) concurrently
                  build(killSwitch, queue)
                    .flatMap(_ => Stream.eval(Console[IO].println("AFTER PROGRAM")))
  } yield result

  val result = program.compile.lastOrError.unsafeRunSync()

this solution works perfectly until I pass the real flink standalone application which consumes data from kafka and pushing aggregated result to db
like this:
flinkAppStream = Stream.eval(IO(FlinkAppEntry.main(Array.empty)))
it will never stop the Stream after computation and taking result from DB
Do you have any idea how to figure out where is the problem
I'll help 10 individuals how to earn $20,000 in just 72 hours from the crypto/forex market. But you will pay me 10% commission when you receive your profit. if interested send me a direct message via Whatapp by asking me HOW for more details on how to get started
+1 (570) 801-0862