Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Jan 31 2019 04:19
    404- forked
    404-/fs2
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
  • Jan 29 2019 17:37
    pchlupacek commented #1406
tulth
@tulth
well, it was lazy list, but i'm converting to fs2 stream, but new to this
any suggestions on how to convert?
probably a dumb question, but is something i should be doing with pull? what is the shorthand cons pattern matching operator like #::?
tulth
@tulth
i think maybe i should be looking at Pipe2?
tulth
@tulth
i ended up building on another comment I found in here and came up with the following:
  def monotonicAscendingMerge[F[_], A](as: Stream[F, A], bs: Stream[F, A])(implicit ev: A => Ordered[A]): Stream[F, A] = {
    // take two streams or ordered types that are monotonically increasing and
    //  create a result stream that merges the input streams to
    //  produce an output that is monotonically increasing
    def go(inL: Option[(A, Stream[F, A])], inR: Option[(A, Stream[F, A])]): Pull[F, A, Unit] =
      (inL, inR) match {
        case (None, None) => Pull.done
        case (Some((lhs, lhsNext)), None) =>
          Pull.output1(lhs) >> lhsNext.pull.uncons1.flatMap(go(_, None))
        case (None, Some((rhs, rhsNext))) =>
          Pull.output1(rhs) >> rhsNext.pull.uncons1.flatMap(go(None, _))
        case (Some((lhs, lhsNext)), Some((rhs, rhsNext))) =>
          if (lhs < rhs)
            Pull.output1(lhs) >> lhsNext.pull.uncons1.flatMap(go(_, inR))
          else
            Pull.output1(rhs) >> rhsNext.pull.uncons1.flatMap(go(inL, _))
      }
    as.pull.uncons1.flatMap(uca => bs.pull.uncons1.flatMap(ucb => go(uca, ucb))).stream
  }
tulth
@tulth
very cool, list of streams not just 2
it seems better further because it handles the case where a stream is empty now, but in a future state it may not be empty?
Fabio Labella
@SystemFw
the main thing actually is the use of StepLeg
you need StepLeg when you Pull from more than one stream
(which is kind of an unfortunate gotcha, but still)
tulth
@tulth
i don't really understand but I will follow the guidance an use the gist
i will ask, what is the danger using uncons across multiple streams vs stepleg? what can happen?
tulth
@tulth
i just dropped in the gist and it worked great!
thank you!
Fabio Labella
@SystemFw

vs stepleg? what can happen?

typelevel/fs2#1678

tulth
@tulth
thank you! that answered my question
David Zhu
@noblecraft
hi not sure if this is a fs2 question or a http4s question:
I’ve got an async api that returns a Stream of Rows :
def data(id: Int): Stream[F, Row] = {
    for {
      q <- Stream.eval(Queue.noneTerminated[F, Row])
      _ <- Stream.eval { Concurrent[F].delay {
          someApi.asyncData(id,  new AsyncDataHandler {
            override def onRow(r: Row): Unit = q.enqueue1(Some(r))
            override def onEnd(): Unit = q.enqueue1(None)
          })
        }
      }
      r <- q.dequeue
    } yield r
}
in http4s, i have:
    val rowsEndpoint = HttpRoutes.of[IO] {
      case GET -> Root / “data" / IntVar(id) => {
        Ok(data(id).map(r =>
          s""”{"col1": ${r.col1()}}"""
        ))
      }
    }
i don’t seem to be getting anything back from the http4s server when hitting the endpoint
i’ve debugged, and the onRow() and onEnd() are being hit
however it seems maybe the Stream never terminates???
Fabio Labella
@SystemFw
replied on the http4s channel, you need to unsafeRun the enqueues
Alex Henning Johannessen
@ahjohannessen
@SystemFw Is your unsealing of dispatcher published somewhere? Can't find it.
Domas Poliakas
@domaspoliakas
I lack context, but ce3 M2 has Dispatcher now if that helps
Alex Henning Johannessen
@ahjohannessen
@domaspoliakas Context is that I had to introduce a wrapper for Dispatcher because the testing in fs2-grpc is a bit hairy, so I did a new trait UnsafeRunner that just wraps Dispatcher and allows for stubbing. However, that is not needed after Fabio's change that got merged.
Domas Poliakas
@domaspoliakas
Ah, gotcha
Alex Henning Johannessen
@ahjohannessen
No biggie, got all the tests to pass. Now I can delete UnsafeRunner as soon that change is published somewhere :)
Domas Poliakas
@domaspoliakas
CE3 Dispatcher is sealed though, if that's meant to be the replacement later on
Alex Henning Johannessen
@ahjohannessen
It is unsealed in upstream now :)
Domas Poliakas
@domaspoliakas
Cool :+1:
mn98
@mn98

Hi all. I'd appreciate a steer from those that know better. I'm attempting to implement a pipe to merge/align several streams with a signature like this:

def mergeIndexed[F[_], A]: Pipe[F, Seq[IndexedSeq[A]], IndexedSeq[Seq[A]]] = ???

where I'm going to keep elements in my final stream only when all indices are deemed 'aligned'.
Before I get into the weeds on this, is the right approach here to look at existing deterministic merging pipes and use StepLeg as they do?

mn98
@mn98
Actually, that's utter nonsense, my pipe is actually like this (I think):
  type MergeN[F[_], C[_], A] = Seq[Stream[F, C[A]]] => Stream[F, C[Seq[A]]]
  case class Indexed[A](index: Long, value: A)  
  def mergeIndexed[F[_], A]: MergeN[F, Indexed, A] = ???
mn98
@mn98
I should add that the indices are assumed to be ordered.
tulth
@tulth
i was recently pointed to this which worked well. perhaps it can be modified to your need?
https://gist.github.com/johnynek/689199b4ac49364e7c94abef996ae59f
mn98
@mn98
@tulth thanks, I'll take a look!
corentin
@corenti13711539_twitter
I'd like to add a rate limiting mechanism in a fs2 Stream. What's the idiomatic way to do that with fs2 v2.4?
Domas Poliakas
@domaspoliakas
.metered could be what you’re looking for
Lucas Satabin
@satabin
@corenti13711539_twitter depending on your needs, you might also have a look at upperbound if metered is not working for you https://github.com/SystemFw/upperbound
corentin
@corenti13711539_twitter
thanks! :thumbsup: What would be the major differences between these two approaches?
Lucas Satabin
@satabin
I would personally start with metered and friends from fs2 core, and if it is not working for my use case, then look at an extra dependency like upperbound
corentin
@corenti13711539_twitter
Sounds reasonable since ATM we'd only need basic rate limiting :thumbsup:
Lucas Satabin
@satabin
in the core library there is also .debounce which drops elements, depending on how you want your rate limiting to behave
if you want to create a stream that execute something at a given rate, you can also have a look at the constructor functions like Stream.fixedRate or Stream.fixedDelay
corentin
@corenti13711539_twitter
I'm processing a Kafka based database update stream with fs2-kafka, so we'd need to just limit in intake rate, but not drop anything.
Fabio Labella
@SystemFw
if you only need to limit one Stream, metered should be fine
corentin
@corenti13711539_twitter
hmm. It's a combined stream created as Stream(kafkaStream1, kafkaStream2, ...).parJoin(x).
If I invoke metered I guess rate limiting is applied globally across all inner streams, right?
corentin
@corenti13711539_twitter
Also, the documentation on parJoin states "maxOpen - Maximum number of open inner streams at any time". Does that really mean what it reads and not the number of streams that can be processed in parallel? If it's the former, then I guess I should typically just set it to the total number of inner streams. Just out of curiosity, what happens if maxOpen is smaller than the number of inner streams?
Gavin Bisesi
@Daenyth
It depends whether the inner streams terminate or not
if smaller and the inner streams never terminate, the "excess" inner streams never execute at all