Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Feb 01 10:11
    @SystemFw banned @Hudsone_gitlab
  • Jan 31 2019 04:19
    404- forked
    404-/fs2
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
Fabio Labella
@SystemFw
e.g. cormorant
Peter Storm
@peterstorm
Ah cool, cheers
Rafi Baker
@BakerRafi_twitter
How do I do parMapN on Streams, I want to do something like (Stream(..), Stream(..), …).parMapN(MyClass.apply). Where do I get NonEmptyParallel for Streams?
Fabio Labella
@SystemFw
@BakerRafi_twitter if you have two streams, you can do parZip
if you have more, it's not really supported
Rafi Baker
@BakerRafi_twitter
Got it, thanks!
Nicolò Martini
@nicmart
Hello everybody and thank you for all the time you put in this awesome library!
I am trying to do something that in my head should be quite straightforward: convert an infinite pure stream to a scala Stream (or LazyList, or even an Iterator).
What's the best way to do that?
I have tried to use the to method but the following does not terminate (I guess than the underlying Collector tries to collect all the elements of the stream)
fs2.Stream.constant(1).compile.to(Stream)
Fabio Labella
@SystemFw
mm interesting, that's not so easy actually
Fabio Labella
@SystemFw
so, I don't think you can transform an fs2.Stream[Pure, A] with std.Stream or LazyList, because those two don't support the fact that the tail of the stream depends on the production of the head
so Iterator is your best bet
an arbitrary Stream[F, A] => Iterator has its complexity as well, but here the stream is pure
@nicmart do you need thread safety on the resulting iterator?
Nicolò Martini
@nicmart
Hi @SystemFw, no, thread safety is not required.
Fabio Labella
@SystemFw
ok, that makes it easier, although you still have a non exactly trivial state machine to deal with
Nicolò Martini
@nicmart

Is it possible to have a function that transform both the head and the tail, like

def withHeadAndTail[A, B](
  fa: fs2.Stream[Pure, A])(
  f: Option[(A, fs2.Stream[Pure, A])] => B
): B = ???

I could then easily use that recursively to build a Stream/LazyList.

Fabio Labella
@SystemFw
the basis of this is Pull
the problem with this approach is that the api you want is really more similar to Iterator than to LazyList
I might PR a Stream.toIterator for Pure and effectful streams, but that doesn't solve your immediate problem
Fabio Labella
@SystemFw
it's actually quite hard for effectful streams when it comes to resource safety, but hopefully I can give you a snippet for pure streams soon-ish
Dmitry Polienko
@nigredo-tori
I believe it would be something like this:
fa.pull.uncons1
  .map(f)
  .flatMap(Pull.output1)
  .stream
  .compile.last
  .getOrElse(sys.error("impossible"))
Fabio Labella
@SystemFw
you need a state machine over that, yes
subhash28
@subhash28
Hello All, is there a way to get an fs2 stream from the console output of a shell script? I can get a Lazylist with Process("my-shell-script").lazyLines. Any leads are much appreciated.
Ryan Zeigler
@rzeigler
You can plug a io.readInputStream onto the Java process api for the simplest version. There is also a library called prox for a harder to misuse api
subhash28
@subhash28
Thanks Ryan, seems prox is exactly what I need.
Fabio Labella
@SystemFw

@nicmart

Completely untested (so probably buggy), and does not generalise to effectful streams along several directions, but try this

  def toIter[A](s: Stream[Pure, A]): Iterator[A] = {
    sealed trait State {
      def hasNext: (State, Boolean) = this match {
        case Idle(s) =>
          s.pull.uncons1
            .flatMap(Pull.output1)
            .stream
            .compile
            .last
            .flatten match {
            case Some((a, next)) => Pulled(a, next) -> true
            case None => Exhausted -> false
          }
        case s @ Pulled(_, _) => s -> true
        case s @ Exhausted => s -> false
      }

      def next: (State, Option[A]) = this match {
        case Idle(_) => hasNext._1.next
        case Pulled(a, tail) => Idle(tail) -> a.some
        case s @ Exhausted => s -> None
      }
    }
    case class Idle(stream: Stream[Pure, A]) extends State
    case class Pulled(a: A, tail: Stream[Pure, A]) extends State
    case object Exhausted extends State

    new Iterator[A] {
      var state: State = Idle(s)
      def hasNext = {
        val (ns, r) = state.hasNext
        state = ns
        r
      }

      def next = {
        val (ns, r) = state.next
        state = ns
        r.getOrElse(throw new Exception("Exhausted"))
      }
    }
  }
Nicolò Martini
@nicmart
Thanks @SystemFw! I will try it as soon as I can!
Gavin Bisesi
@Daenyth

@SystemFw re: Daniel's question about upperbound/Limiter etc

How hard would it be to create a way to compose the product of two Limiter? Such that it honors the most bound rate of both

Fabio Labella
@SystemFw
mm not too sure
the library needs some love in general, it was created to serve a pretty specific work need at my old place
Gavin Bisesi
@Daenyth
:+1:
Fabio Labella
@SystemFw
I'm glad people seem to find it useful as it is, but it would probably benefit from a nicer redesign
Gavin Bisesi
@Daenyth
sure
Fabio Labella
@SystemFw
the rate is controlled by a SignallingRef, which is exposed
so it shouldn't be impossible, but I haven't given it too much though
Gavin Bisesi
@Daenyth
Hmm, I think I recall that Applicative[Signal] got in before?
That it wasn't there at first but finally got added some time ago
signal/signallingref
oh no wait, wouldn't it have to be Invariant in order to support set+get?
is there a thing for invariant applicatives...?
Gavin Bisesi
@Daenyth
yeah Applicative[Signal] but Invariant[SignallingRef]
Gavin Bisesi
@Daenyth
@SystemFw well at minimum I've so far confirmed that
  • I'm really happy for TestContext
  • I'm really happy I spent effort writing a does-not-deadlock test for this code
  • It's not safe to nest Limiter.await calls for two independent limiters
I'm trying to follow the logic of how it's deadlocking specifically
I'm passing the default queue limit of MaxValue so it's not that
Nicolò Martini
@nicmart
@SystemFw I quickly tested your iterator code and it works very well for what I can see.
Probably I will also need to extend it to the effectful case, but this is already a lot, thank you!
Fabio Labella
@SystemFw
that's significantly harder
unfortunately
I'm specifically not sure about a Stream which opens resources, but whose iterator does not complete
Gavin Bisesi
@Daenyth
^ the limiter thing might actually be due to the test
Fabio Labella
@SystemFw
you already would need to use extendScope to delay the finaliser opened by a pull, and attach it to the rest of the Stream
but that means you can't then run finalisers in case of problems, without having to run the rest of the stream as well