Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Feb 01 2021 10:11
    @SystemFw banned @Hudsone_gitlab
  • Jan 31 2019 04:19
    404- forked
    404-/fs2
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
Bjørn Madsen
@aeons
you can't get it out as an F[R] somehow?
Gabriel Volpe
@gvolpe
(I think) you can emit R values if you pattern match on uncons1.flatMap, for example
not sure though

but on using the second "copy" of that stream, I get the above error

right, you need to fully consume the body (bytes) once and then make any needed transformation, same with Http4s

Oleg Pyzhcov
@oleg-py
@disblader I guess you can broadcast an input and join the results? Not sure what you want to get out, but something like this is definitely possible:
def broadcasting[F[_]: Concurrent, A, B](pipes: fs2.Stream[F, Pipe[F, A, B]]): Pipe[F, A, B] =
    (input: fs2.Stream[F, A]) =>
      input.broadcast.zipWith(pipes)(_.through(_))
        .parJoinUnbounded
Gabriel Volpe
@gvolpe
I think broadcasting on a reactive stream of bytes doesn't seem to work, AFAIU
java.lang.IllegalStateException: This publisher only supports one subscriber
Domas Poliakas
@domaspoliakas
That’d run every pipe concurrently, right? That’s not quite what I’m looking for; I’ll try to elaborate on the problem
Gabriel Volpe
@gvolpe
@disblader regardless of your problem, you probably don't want to model it as a stream of a function from stream to stream (AKA pipe)
Bjørn Madsen
@aeons
@gvolpe I think I have just misunderstood how broadcast works
Gabriel Volpe
@gvolpe
Have you tried using observe @aeons ?
broadcast would be ideal if it works
Domas Poliakas
@domaspoliakas
I think trying to write the problem down revealed how the path to a pipe of streams was a frankenstein of a few half-implemented solutions, so I think I’ll just go hold backspace for a while :smile:
Gavin Bisesi
@Daenyth
Is there anything in fs2 I can use to debug why some finalizers aren't tripping?

I have a test which does:

val test: IO[Assertion] = 
  Stream.eval(setup).flatMap { data =>
    myStreamResource(data).flatMap { obj =>
      obj.resultStream.take(1).evalMap(r => IO(assert(r.isOk)))
    }
  }.compile.lastOrError

^ And that doesn't terminate. Where myStreamResource's obj.resultStream is a parJoin of multiple bracketed streams

The assert does get invoked - if I change it to a fail() then the test fails on that line
But otherwise the IO doesn't terminate
no difference whether I run with global or the scalatest EC
Hudson Clark
@hudclark

Hi all - I'm looking for a way to gracefully terminate a stream. When I cancel a stream backed by a queue, I would like for the queue to drain before terminating. Here's an example of what I'm looking for https://scastie.scala-lang.org/31krqh0ZSWuxPbZ3G9S2tQ

val stream = Stream.eval(InspectableQueue.bounded[IO, Int](5))
  .flatMap { queue =>
    Stream.range(0, 100).covary[IO].evalTap(i => IO(println(s"enqueued $i"))).through(queue.enqueue)
      .concurrently(queue.dequeue.evalTap(i => IO(println(s"dequeued $i"))).metered(1.second))
  }

val program = IO.race(
  stream.compile.drain,
  IO.sleep(2.seconds)
)

// I would like to see a "dequeue" for every "enqueue"
program.unsafeRunSync()

I'm coming from akka-streams, and using that I would first wait to close the queue until my downstream has consumed all of the outstanding messages

Billzabob
@Billzabob
Look at the docs for concurrently. It starts the supplied stream in the background and will terminate it when the first stream ends. So that’s why you’re seeing that behavior. Take a look at merge and it’s derivatives. Those sound like more of what you want.
Hudson Clark
@hudclark

Thanks for the suggestions @Billzabob - I'll take a look. It's worth noting that I'm not actually using concurrently for the "real" code - it looks like this

    val messages = for {
      // resource is a BoundedQueue. An async process enqueues messages
      queue <- Stream.resource(Resource(resource))
      messages <- queue.dequeue
    } yield messages
   // Started inside of an IOApp.run
    messages.parEvalMap(10)(myProcess).evalMap(ackMessage).compile.drain

My goal is be able to cancel the processing stream (by killing the running IOApp) and not have any unacked messages in the queue. In other words, I'm really looking for a way to gracefully terminate the stream.

Fabio Labella
@SystemFw
@hudclark you need to use a NoneTerminated Queue , which is basically a queue of options, and enqueue None there, the consumer will use unNoneTerminated to stop once None arrives. It's a more complex if you have multiple consumers though
concurrently does not work in this case since it uses interruptWhen, and ultimately that relies on the interruption mechanics, which are non deterministic by design
Hudson Clark
@hudclark
I appreciate the help @SystemFw. Sounds like I'll need to use the Concurrent#cancelable builder to explicitly enqueue a None value when the stream is to cancelled. The downside to that is I can't expose a Stream[F, Message] in my API and still have the "graceful" shutdown. Do you know of a way to enqueue a None value and still expose a stream in my public API?
Gavin Bisesi
@Daenyth

@hudclark re: having code wait for "clean" shutdown of a NoneTerminatedQueue

It's easiest if you can control producer+consumer in one location because you can do something like:

val read = q.dequeue.flatMap(consume) ++ isFinishedSignalRef.set(true)
val write = producer.run(q) ++ q.enqueue1(None)
Stream(
  (read concurrently write),
  isFinishedSignalRef.discrete.dropWhile(_ == false).take(1) >> runOnTermination
).parJoinUnbounded
I haven't come up with a good general solution - maybe wrapping up that structure in a Resource or something
Gavin Bisesi
@Daenyth
note I haven't tested the above, just wrote on the fly. But it feels right to me
Hudson Clark
@hudclark
Thanks for pointing me in the right direction, Gavin. I ended up making it work with the following
    Stream.eval(InspectableQueue.bounded[F, Option[PubSubMessageEnvelope[F]]](settings.prefetch)).flatMap { queue =>
      val read = queue.dequeue.unNoneTerminate
      // When the publisher closes, it will enqueue a None, stopping the queue
      val publisher = Stream.resource(Resource(createPublisher(queue)))
      val stopper = publisher *> Stream.eval(stop.get.attempt).take(1)
      read concurrently stopper
    }
Gavin Bisesi
@Daenyth
:+1: awesome
Nik Gushchin
@Dr-Nikson

Hello there!
Guys is it possible to have a dynamic broadcast?
I mean .. from docs:

Stream(1,2,3,4).broadcast.map { worker =>
  worker.evalMap { o => IO.println("1:" + o.toString) }
}.take(3).parJoinUnbounded

But what if I have to deal with N streams (not 3) and this number can change over time?

Gavin Bisesi
@Daenyth
There's a lot of ways to "fan out" and it depends what overall behavior you're looking for. More context?
Nik Gushchin
@Dr-Nikson
@Daenyth oh, I see.
In my case I need to make something like:
val processors = List(
  v => v > 5,
  v => v > 10,
  // ... 
)

Stream(1,2,3,4)
  .through(Broadcast(minReady = 1))
  .take(processors.length)
  .zipWithIndex
  .flatMap { case (src, idx) => src.filter(processors(idx)).fold(...) } 
  .parJoinUnbounde
Gavin Bisesi
@Daenyth
I don't see how the N would vary here
Nik Gushchin
@Dr-Nikson
@Daenyth I mean if processors’s list is kinda dynamic, hmm
Maybe I should rethink somehow about it
Fabio Labella
@SystemFw
@Dr-Nikson dynamic broadcast --> Topic
Gavin Bisesi
@Daenyth
:+1:
Nik Gushchin
@Dr-Nikson
@SystemFw @Daenyth
looks like exactly what I need.
Thanks, guys. Allways wondering how elegant solution can be with fs2 and some amount general primitives.
Gavin Bisesi
@Daenyth
I think there's an fs2 chat server example around that you could check on too, it has to handle the "dynamic subscriber" kind of problem
Nik Gushchin
@Dr-Nikson
Ok, I’ll definitely dive into it
and up to date it appears too
Nik Gushchin
@Dr-Nikson
🙏🏼
Adam Rosien
@arosien
oo!
Gavin Bisesi
@Daenyth
@mpilquist re: that fs2-chat app, I'm curious why not use a Topic there? It feels like a natural fit, especially if you wanted to expand to having multiple chat rooms people could join (like IRC for example)
Actually implementing an IRC server with fs2 seems like a fun job, the protocol is dead simple
one of the first real things I coded ever was an irc client
Fabio Labella
@SystemFw
I think the Map[Id, Channel] model is the most flexible
you can support one-to-one, and fairly easy broadcast
also, the old implementation of Topic uses the same concept so in a way it's a naked topic there