Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Jan 31 2019 04:19
    404- forked
    404-/fs2
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
  • Jan 29 2019 17:37
    pchlupacek commented #1406
Domas Poliakas
@domaspoliakas
Good evening friends. Am I reading it right that in fs3 to .compileeither the stream needs to be pure, or F must be either Concurrent or Sync?
Fabio Labella
@SystemFw
yes
Domas Poliakas
@domaspoliakas
Hm, that is kind of leading me into an odd situation. I'm loosening the Sync constraint in some places, but because of .compile I have to turn it into a Concurrent. Which makes code that was not concurrent (since it was only constrained by Sync in ce2) look concurrent.
Fabio Labella
@SystemFw
that's ok I would say, but maybe link/comment on the PR with the ones you're having doubts about
Domas Poliakas
@domaspoliakas
I don't particularly have doubts, but as I was doing it I thought to myself that it's a little odd. I guess technically Sync allows SyncIO whereas Concurrent doesn't? Maybe it's not important though. I'll leave it to yourself and other reviewers make that judgement :smile:
Michael Pilquist
@mpilquist
What’s the PR too?
If you really want both Sync or Concurrent, you can take a Compiler.Target constraint instead
Domas Poliakas
@domaspoliakas
It's on http4s, Fabio just happened to be one of the reviewers on it
Michael Pilquist
@mpilquist
Ah cool
Then Concurrent is best
tulth
@tulth
heyas, i have prior implementation for merging (sorted) two infinite lazylists, and it uses this:
  def monotonicAscendingMerge[A](as: Stream[Pure, A], bs: Stream[Pure, A])(implicit ev: A => Ordered[A]): Stream[Pure, A] = {
    (as, bs) match {
      case (Stream.empty, _) => bs
      case (_, Stream.empty) => as
      case (aHead#::aTail, bHead#::bTail) =>
        if (aHead < bHead) aHead #:: monotonicAscendingMerge(aTail, bs)
        else bHead #:: monotonicAscendingMerge(as, bTail)
    }
well, it was lazy list, but i'm converting to fs2 stream, but new to this
any suggestions on how to convert?
probably a dumb question, but is something i should be doing with pull? what is the shorthand cons pattern matching operator like #::?
tulth
@tulth
i think maybe i should be looking at Pipe2?
tulth
@tulth
i ended up building on another comment I found in here and came up with the following:
  def monotonicAscendingMerge[F[_], A](as: Stream[F, A], bs: Stream[F, A])(implicit ev: A => Ordered[A]): Stream[F, A] = {
    // take two streams or ordered types that are monotonically increasing and
    //  create a result stream that merges the input streams to
    //  produce an output that is monotonically increasing
    def go(inL: Option[(A, Stream[F, A])], inR: Option[(A, Stream[F, A])]): Pull[F, A, Unit] =
      (inL, inR) match {
        case (None, None) => Pull.done
        case (Some((lhs, lhsNext)), None) =>
          Pull.output1(lhs) >> lhsNext.pull.uncons1.flatMap(go(_, None))
        case (None, Some((rhs, rhsNext))) =>
          Pull.output1(rhs) >> rhsNext.pull.uncons1.flatMap(go(None, _))
        case (Some((lhs, lhsNext)), Some((rhs, rhsNext))) =>
          if (lhs < rhs)
            Pull.output1(lhs) >> lhsNext.pull.uncons1.flatMap(go(_, inR))
          else
            Pull.output1(rhs) >> rhsNext.pull.uncons1.flatMap(go(inL, _))
      }
    as.pull.uncons1.flatMap(uca => bs.pull.uncons1.flatMap(ucb => go(uca, ucb))).stream
  }
tulth
@tulth
very cool, list of streams not just 2
it seems better further because it handles the case where a stream is empty now, but in a future state it may not be empty?
Fabio Labella
@SystemFw
the main thing actually is the use of StepLeg
you need StepLeg when you Pull from more than one stream
(which is kind of an unfortunate gotcha, but still)
tulth
@tulth
i don't really understand but I will follow the guidance an use the gist
i will ask, what is the danger using uncons across multiple streams vs stepleg? what can happen?
tulth
@tulth
i just dropped in the gist and it worked great!
thank you!
Fabio Labella
@SystemFw

vs stepleg? what can happen?

typelevel/fs2#1678

tulth
@tulth
thank you! that answered my question
David Zhu
@noblecraft
hi not sure if this is a fs2 question or a http4s question:
I’ve got an async api that returns a Stream of Rows :
def data(id: Int): Stream[F, Row] = {
    for {
      q <- Stream.eval(Queue.noneTerminated[F, Row])
      _ <- Stream.eval { Concurrent[F].delay {
          someApi.asyncData(id,  new AsyncDataHandler {
            override def onRow(r: Row): Unit = q.enqueue1(Some(r))
            override def onEnd(): Unit = q.enqueue1(None)
          })
        }
      }
      r <- q.dequeue
    } yield r
}
in http4s, i have:
    val rowsEndpoint = HttpRoutes.of[IO] {
      case GET -> Root / “data" / IntVar(id) => {
        Ok(data(id).map(r =>
          s""”{"col1": ${r.col1()}}"""
        ))
      }
    }
i don’t seem to be getting anything back from the http4s server when hitting the endpoint
i’ve debugged, and the onRow() and onEnd() are being hit
however it seems maybe the Stream never terminates???
Fabio Labella
@SystemFw
replied on the http4s channel, you need to unsafeRun the enqueues
Alex Henning Johannessen
@ahjohannessen
@SystemFw Is your unsealing of dispatcher published somewhere? Can't find it.
Domas Poliakas
@domaspoliakas
I lack context, but ce3 M2 has Dispatcher now if that helps
Alex Henning Johannessen
@ahjohannessen
@domaspoliakas Context is that I had to introduce a wrapper for Dispatcher because the testing in fs2-grpc is a bit hairy, so I did a new trait UnsafeRunner that just wraps Dispatcher and allows for stubbing. However, that is not needed after Fabio's change that got merged.
Domas Poliakas
@domaspoliakas
Ah, gotcha
Alex Henning Johannessen
@ahjohannessen
No biggie, got all the tests to pass. Now I can delete UnsafeRunner as soon that change is published somewhere :)
Domas Poliakas
@domaspoliakas
CE3 Dispatcher is sealed though, if that's meant to be the replacement later on
Alex Henning Johannessen
@ahjohannessen
It is unsealed in upstream now :)
Domas Poliakas
@domaspoliakas
Cool :+1:
mn98
@mn98

Hi all. I'd appreciate a steer from those that know better. I'm attempting to implement a pipe to merge/align several streams with a signature like this:

def mergeIndexed[F[_], A]: Pipe[F, Seq[IndexedSeq[A]], IndexedSeq[Seq[A]]] = ???

where I'm going to keep elements in my final stream only when all indices are deemed 'aligned'.
Before I get into the weeds on this, is the right approach here to look at existing deterministic merging pipes and use StepLeg as they do?

mn98
@mn98
Actually, that's utter nonsense, my pipe is actually like this (I think):
  type MergeN[F[_], C[_], A] = Seq[Stream[F, C[A]]] => Stream[F, C[Seq[A]]]
  case class Indexed[A](index: Long, value: A)  
  def mergeIndexed[F[_], A]: MergeN[F, Indexed, A] = ???
mn98
@mn98
I should add that the indices are assumed to be ordered.
tulth
@tulth
i was recently pointed to this which worked well. perhaps it can be modified to your need?
https://gist.github.com/johnynek/689199b4ac49364e7c94abef996ae59f
mn98
@mn98
@tulth thanks, I'll take a look!
corentin
@corenti13711539_twitter
I'd like to add a rate limiting mechanism in a fs2 Stream. What's the idiomatic way to do that with fs2 v2.4?
Domas Poliakas
@domaspoliakas
.metered could be what you’re looking for
Lucas Satabin
@satabin
@corenti13711539_twitter depending on your needs, you might also have a look at upperbound if metered is not working for you https://github.com/SystemFw/upperbound