Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Feb 01 10:11
    @SystemFw banned @Hudsone_gitlab
  • Jan 31 2019 04:19
    404- forked
    404-/fs2
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
Michael Pilquist
@mpilquist
Try adding fork in run := true to build.sbt
The default behavior of run is to not fork a JVM and instead just call your main method. When you ctrl-c, SBT tries interrupting the main thread, which doesn't work well with IOApp
Add outputStrategy := Some(StdoutOutput) too
Some discussion here if interested more in why it's happening: https://gitter.im/typelevel/cats-effect?at=5dd2eb9ceeb63e1a83d878f3
James Cosford
@jamescosford
OK great I will try those, and read that too.
Thanks @mpilquist
Awesome, fork worked!
Ion Alberdi
@yetanotherion

Hello, we managed to reproduce a bug we have in production in the snippet below

import cats.effect._
import fs2._
import scala.concurrent.duration._

sealed trait LeftOrRight
case object Left extends LeftOrRight
case object Right extends LeftOrRight

object Hello extends IOApp {
  def run(args: List[String]): IO[ExitCode] = {
    val left: Stream[IO, LeftOrRight] = Stream.eval(IO.sleep(1.milli).map(_ => Left))
    val right: Stream[IO, LeftOrRight] = Stream.awakeEvery[IO](1.milli).map(_ => Right)
    val main: Stream[IO, Unit] = left.mergeHaltL(right).last.flatMap({
      case Some(Left) => Stream.emit(())
      case Some(Right) => Stream.raiseError[IO](new Exception("Stream ends with Right"))
      case None => Stream.raiseError[IO](new Exception("Unexpected empty stream"))
    })
    main.repeatN(10)
      .compile.drain
      .map(_ => ExitCode.Success)
  }
}

By executing it, it ends with "Stream ends with Right", meaning the last element from the stream comes from the right.
AFAIU, reading the implementation

def mergeHaltL[F2[x] >: F[x]: Concurrent, O2 >: O](that: Stream[F2, O2]): Stream[F2, O2] =
    noneTerminate.merge(that.map(Some(_))).unNoneTerminate

the behavior can be expected (the last element from the stream comes from the left indeed.
However this element is None, and an element from the right stream could have been pulled before which would trigger the behavior described above).
Do you think it's a bug?

Fabio Labella
@SystemFw
@yetanotherion I don't think it is
mergeHaltR and mergeHaltL aren't really giving you a guarantee about from which side the last element is coming from
they are only telling you which side is going to trigger interruption

However this element is None, and an element from the right stream could have been pulled before which would trigger the behavior described above

what I mean is that this statement is correct, and it describes the behaviour you see

Brandon G
@gannicottb

Is this still true?

Most concurrent queues in FS2 support tracking their size, which is handy for implementing size-based throttling of the producer.

Or to cut to the chase, what's the easiest way to enqueue a None to a NoneTerminatedQueue when the stream runs out of items?
Brandon G
@gannicottb
so far I've done this:
(stream.take(10).map(item => Some(item)) ++ Stream(None)).evalTap(e => q.enqueue1(e)).compile.drain
I was hoping that maybe I could skip the mapping and concatenation on the source stream, and do the Option wrapping/None passing in the evalTap
Brandon G
@gannicottb
is it appropriate to use onFinalize to enqueue the terminating None?
stream.take(10).evalTap(e => q.enqueue1(Some(e))).onFinalize(q.enqueue1(None)).compile.drain
Fabio Labella
@SystemFw
@gannicottb your first code is equivalent to the noneTerminate combinator
onFinalize is perhaps more appropriate in this case
Brandon G
@gannicottb
haha, didn't know .noneTerminate existed. the onFinalize approach at least doesn't leave the queue hanging if something happens to the stream, might go that way for now
Ion Alberdi
@yetanotherion

mergeHaltR and mergeHaltL aren't really giving you a guarantee about from which side the last element is coming from they are only telling you which side is going to trigger interruption

@SystemFw thank you, do you think it would be worth mentioning somewhere in the code / doc? I'd be glad to give it a try (if it would be welcome)

Fabio Labella
@SystemFw
why not, clarifying scaladoc is always useful
Paulius Imbrasas
@CremboC
is there something like scan, but the output type is not the same as the state? i.e.
def scan2[O2, O3](z: O2)(f: (O2, O) => (O2, O3)): Stream[F, O3]
Ion Alberdi
@yetanotherion

why not, clarifying scaladoc is always useful

functional-streams-for-scala/fs2#1837

Fabio Labella
@SystemFw
@CremboC isn't that basically mapAccumulate?
Paulius Imbrasas
@CremboC
yeah, it is, just need to do a map afterwards, but good enough :P
mapAccumulate returns Stream[F, (O2, O3)]
otto-dev
@otto-dev
Hi guys. Is there a way how I can apply a stateful mapping function, such as (S, A) => Option[(S, B)] to each element, where S is some state that is carried along for each element, based on the previous result, while B is the element to be emitted? Similar to scanChunksOpt, but applying to elements, not chunks
otto-dev
@otto-dev
@CremboC Hey, that's exactly my question as well, just in different words! mapAccumulate is a good start, but did you find any other approach?
Fabio Labella
@SystemFw
I would go with mapAccumulate(..).unNone.map(_._2)
but I'm not sure what the meaning of Option is in your example
otto-dev
@otto-dev
Thanks for your tip. The meaning of None is end-of-stream
But that's not the focus of my question anyway. It's just the signature that I'm familiar with. Primary question was how to map statefully
otto-dev
@otto-dev
mapAccumulate on Chunk seems to have the signature @CremboC and I are looking for
Gavin Bisesi
@Daenyth
Is A != B?
unfold or unfoldEval may be part of what you want
you could also have a stream with (non-concurrent) effects in StateT maybe
otto-dev
@otto-dev
Yeah it's like unfold, but on a stream
Gavin Bisesi
@Daenyth
Right, Stream's companion object has such methods
otto-dev
@otto-dev
Yes, but that would mean that the actual stream instance becomes part of the state passed as zero into unfold, which becomes quite clumsy
Gavin Bisesi
@Daenyth
I don't follow
Do you mean that you have a stream of elements and you want to apply the state transformations over them?
otto-dev
@otto-dev
Yes, that's right
Gavin Bisesi
@Daenyth
rather than using unfold to produce the stream in the first place from a seed/
ok, makes sense
otto-dev
@otto-dev
You got me
Gavin Bisesi
@Daenyth
mapAccumulate + unNoneTerminate does sound close to right
otto-dev
@otto-dev
Thanks for your lead
I think I have got it now, digging the source code a bit using the IDE
Here is what I've got: Say, as an example, you want to calculate the Int difference between the current element compared to the last element, as a simple stateful example