Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Jan 31 2019 04:19
    404- forked
    404-/fs2
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
  • Jan 29 2019 17:37
    pchlupacek commented #1406
David Zhu
@noblecraft
however it seems maybe the Stream never terminates???
Fabio Labella
@SystemFw
replied on the http4s channel, you need to unsafeRun the enqueues
Alex Henning Johannessen
@ahjohannessen
@SystemFw Is your unsealing of dispatcher published somewhere? Can't find it.
Domas Poliakas
@domaspoliakas
I lack context, but ce3 M2 has Dispatcher now if that helps
Alex Henning Johannessen
@ahjohannessen
@domaspoliakas Context is that I had to introduce a wrapper for Dispatcher because the testing in fs2-grpc is a bit hairy, so I did a new trait UnsafeRunner that just wraps Dispatcher and allows for stubbing. However, that is not needed after Fabio's change that got merged.
Domas Poliakas
@domaspoliakas
Ah, gotcha
Alex Henning Johannessen
@ahjohannessen
No biggie, got all the tests to pass. Now I can delete UnsafeRunner as soon that change is published somewhere :)
Domas Poliakas
@domaspoliakas
CE3 Dispatcher is sealed though, if that's meant to be the replacement later on
Alex Henning Johannessen
@ahjohannessen
It is unsealed in upstream now :)
Domas Poliakas
@domaspoliakas
Cool :+1:
mn98
@mn98

Hi all. I'd appreciate a steer from those that know better. I'm attempting to implement a pipe to merge/align several streams with a signature like this:

def mergeIndexed[F[_], A]: Pipe[F, Seq[IndexedSeq[A]], IndexedSeq[Seq[A]]] = ???

where I'm going to keep elements in my final stream only when all indices are deemed 'aligned'.
Before I get into the weeds on this, is the right approach here to look at existing deterministic merging pipes and use StepLeg as they do?

mn98
@mn98
Actually, that's utter nonsense, my pipe is actually like this (I think):
  type MergeN[F[_], C[_], A] = Seq[Stream[F, C[A]]] => Stream[F, C[Seq[A]]]
  case class Indexed[A](index: Long, value: A)  
  def mergeIndexed[F[_], A]: MergeN[F, Indexed, A] = ???
mn98
@mn98
I should add that the indices are assumed to be ordered.
tulth
@tulth
i was recently pointed to this which worked well. perhaps it can be modified to your need?
https://gist.github.com/johnynek/689199b4ac49364e7c94abef996ae59f
mn98
@mn98
@tulth thanks, I'll take a look!
corentin
@corenti13711539_twitter
I'd like to add a rate limiting mechanism in a fs2 Stream. What's the idiomatic way to do that with fs2 v2.4?
Domas Poliakas
@domaspoliakas
.metered could be what you’re looking for
Lucas Satabin
@satabin
@corenti13711539_twitter depending on your needs, you might also have a look at upperbound if metered is not working for you https://github.com/SystemFw/upperbound
corentin
@corenti13711539_twitter
thanks! :thumbsup: What would be the major differences between these two approaches?
Lucas Satabin
@satabin
I would personally start with metered and friends from fs2 core, and if it is not working for my use case, then look at an extra dependency like upperbound
corentin
@corenti13711539_twitter
Sounds reasonable since ATM we'd only need basic rate limiting :thumbsup:
Lucas Satabin
@satabin
in the core library there is also .debounce which drops elements, depending on how you want your rate limiting to behave
if you want to create a stream that execute something at a given rate, you can also have a look at the constructor functions like Stream.fixedRate or Stream.fixedDelay
corentin
@corenti13711539_twitter
I'm processing a Kafka based database update stream with fs2-kafka, so we'd need to just limit in intake rate, but not drop anything.
Fabio Labella
@SystemFw
if you only need to limit one Stream, metered should be fine
corentin
@corenti13711539_twitter
hmm. It's a combined stream created as Stream(kafkaStream1, kafkaStream2, ...).parJoin(x).
If I invoke metered I guess rate limiting is applied globally across all inner streams, right?
corentin
@corenti13711539_twitter
Also, the documentation on parJoin states "maxOpen - Maximum number of open inner streams at any time". Does that really mean what it reads and not the number of streams that can be processed in parallel? If it's the former, then I guess I should typically just set it to the total number of inner streams. Just out of curiosity, what happens if maxOpen is smaller than the number of inner streams?
Gavin Bisesi
@Daenyth
It depends whether the inner streams terminate or not
if smaller and the inner streams never terminate, the "excess" inner streams never execute at all
if they do terminate, then it does what you'd expect - run N to completion at any one time. Once one terminates, start the next
corentin
@corenti13711539_twitter
:thumbsup: ok, so since in my case the inner streams stream data from different topics using parJoinUnbounded is probably the right combinator to use.
Gavin Bisesi
@Daenyth
yes - it's also not really unbounded there. The number of topics is bounded by your app structure and config
Sergey Torgashov
@satorg

Hi guys. I has been working on some quite sophisticated nested streams processing and found out that one of the way to approach the problem could be an evalFold method in Stream (which would look like the regular fold but would take (O2, O) => F[O2] function). But there's no such method in Stream although there're many other eval* counterparts for effectless methods.

I wonder, is it due to some conceptual concerns regarding such method or does evalFold just still await its hero to get implemented?

Gavin Bisesi
@Daenyth
Isn't there an evalScan?
yes. evalFold should be easy then
conceptually fold is just scan.last
even if you might implement it lower than that for efficiency
Sergey Torgashov
@satorg
Yes, indeed, thanks for the hint. I just was confused that there're a plenty of eval* methods but no evalFold nor evalFold (or evalReduce).
Sergey Torgashov
@satorg

To me it should be as easy as the following:

    def evalFold[F2[x] >: F[x], O2](z: O2)(f: (O2, O) => F2[O2]): Stream[F2, O2] = {

      def go(in: Stream[F2, O])(zz: O2): Pull[F2, INothing, O2] = {
        in.pull.uncons1.flatMap {
          case None => Pull.pure(zz)
          case Some((hd, tl)) =>
            Pull.eval(f(zz, hd)).flatMap { go(tl) }
        }
      }

      go(this)(z).flatMap(Pull.output1).stream
    }

I borrowed it from implementations of evalScan and regular fold, actually.

Rafi Baker
@BakerRafi_twitter
Is there a combinator to reduce a Stream to singleton Stream of Unit?
Fabio Labella
@SystemFw
@satorg it's usual problem of knowing where to stop
like, do we add eval versions of everything?
@BakerRafi_twitter why do you need it to be a singleton?
anyway you can do .drain ++ Stream(()) , or void.foldMonoid
Rafi Baker
@BakerRafi_twitter
Thanks. I have db insert that emits a Stream of ids which I don’t care to continue with next step. But having this in a for comp is making the downstream steps repeat for every id emitted.
Fabio Labella
@SystemFw
right
I would recommend against for in general
what you want to express there is ++ really
Gavin Bisesi
@Daenyth
yeah, for is often an antipattern with streams
Generally only useful for parts that could be done in Resource/F anyway