Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Feb 01 10:11
    @SystemFw banned @Hudsone_gitlab
  • Jan 31 2019 04:19
    404- forked
    404-/fs2
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
Gavin Bisesi
@Daenyth
*almost
return type differs
G[A] => (A => F[B]) => F[G[B]]
Stream[F, A] => (A => F[B]) => Stream[F, B]
Whatever function you'd pass to traverse, pass to evalMap instead
Perhaps more analogous to StateT's semiFlatMap
Ryan Peters
@sloshy
As @arosien mentioned (ty for the shout-out) the solution I usually end up using is something like EventState in fs2-es, which uses Ref instead of StateT. The library also has a (somewhat rudimentary) state cache based on when they were last used, which I find I usually end up wanting so I don't have to keep rehydrating my state.
I've tried putting StateT in streamy situations recently and I've come away with the general opinion to "just use Ref" all the time. Not that I'd never use StateT but for Stream in particular it poses some complexity issues.
Ukonn Ra
@UkonnRa
OK, I will try Ref, thanks a lot!
Ryan Peters
@sloshy
Here's the source of EventState which might point you in the right direction. It's a little obfuscated since I had to break it apart in a few places but the gist is I make a Ref with my initial state, I define a function to fold on my events with, and as events come in from an incoming stream (via hookup or one-at-a-time via doNext) it modifies the state. hydrated might be what you're looking for as it initializes the state given a stream of starting events you've provided for it.
James Cosford
@jamescosford

Hi! I'm running this quick and dirty app to send generated data to my server, which is marginally less quick and dirty:

//... some stuff omitted, probably not relevant to the issue

  def streamForDev(app: String, name: String, path: SimGPSPath, rate: FiniteDuration): Stream[F, Unit] = 
    Stream
      .unfoldEval[F, SimGPSState, SimGPSPoint](stateForPath(path))(stepFunc(rate))
      .metered(rate)
      .evalMap(o => toReport(app = app, dev = name, o))
      .evalMap(o => ReportSubmitter[F].reportSubmit(o)) // 
      .map(_ => ())

  def run: F[Unit] = Stream[F, Stream[F, Unit]](Paths.devices.map {
    case (app, dev, path) => streamForDev(app, dev, path, 2.seconds)
  } : _*).parJoinUnbounded.compile.drain
}

object Main extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = new Application[IO].run.map(_ => ExitCode.Success)
}

The issue is, when I run the app through sbt, and then ctrl+c to kill it, the server keeps receiving messages! Eventually, when I kill the server process this process dies because it can't connect anymore. How can I kill this one cleanly in response to a ctrl+c?

Michael Pilquist
@mpilquist
Try adding fork in run := true to build.sbt
The default behavior of run is to not fork a JVM and instead just call your main method. When you ctrl-c, SBT tries interrupting the main thread, which doesn't work well with IOApp
Add outputStrategy := Some(StdoutOutput) too
Some discussion here if interested more in why it's happening: https://gitter.im/typelevel/cats-effect?at=5dd2eb9ceeb63e1a83d878f3
James Cosford
@jamescosford
OK great I will try those, and read that too.
Thanks @mpilquist
Awesome, fork worked!
Ion Alberdi
@yetanotherion

Hello, we managed to reproduce a bug we have in production in the snippet below

import cats.effect._
import fs2._
import scala.concurrent.duration._

sealed trait LeftOrRight
case object Left extends LeftOrRight
case object Right extends LeftOrRight

object Hello extends IOApp {
  def run(args: List[String]): IO[ExitCode] = {
    val left: Stream[IO, LeftOrRight] = Stream.eval(IO.sleep(1.milli).map(_ => Left))
    val right: Stream[IO, LeftOrRight] = Stream.awakeEvery[IO](1.milli).map(_ => Right)
    val main: Stream[IO, Unit] = left.mergeHaltL(right).last.flatMap({
      case Some(Left) => Stream.emit(())
      case Some(Right) => Stream.raiseError[IO](new Exception("Stream ends with Right"))
      case None => Stream.raiseError[IO](new Exception("Unexpected empty stream"))
    })
    main.repeatN(10)
      .compile.drain
      .map(_ => ExitCode.Success)
  }
}

By executing it, it ends with "Stream ends with Right", meaning the last element from the stream comes from the right.
AFAIU, reading the implementation

def mergeHaltL[F2[x] >: F[x]: Concurrent, O2 >: O](that: Stream[F2, O2]): Stream[F2, O2] =
    noneTerminate.merge(that.map(Some(_))).unNoneTerminate

the behavior can be expected (the last element from the stream comes from the left indeed.
However this element is None, and an element from the right stream could have been pulled before which would trigger the behavior described above).
Do you think it's a bug?

Fabio Labella
@SystemFw
@yetanotherion I don't think it is
mergeHaltR and mergeHaltL aren't really giving you a guarantee about from which side the last element is coming from
they are only telling you which side is going to trigger interruption

However this element is None, and an element from the right stream could have been pulled before which would trigger the behavior described above

what I mean is that this statement is correct, and it describes the behaviour you see

Brandon G
@gannicottb

Is this still true?

Most concurrent queues in FS2 support tracking their size, which is handy for implementing size-based throttling of the producer.

Or to cut to the chase, what's the easiest way to enqueue a None to a NoneTerminatedQueue when the stream runs out of items?
Brandon G
@gannicottb
so far I've done this:
(stream.take(10).map(item => Some(item)) ++ Stream(None)).evalTap(e => q.enqueue1(e)).compile.drain
I was hoping that maybe I could skip the mapping and concatenation on the source stream, and do the Option wrapping/None passing in the evalTap
Brandon G
@gannicottb
is it appropriate to use onFinalize to enqueue the terminating None?
stream.take(10).evalTap(e => q.enqueue1(Some(e))).onFinalize(q.enqueue1(None)).compile.drain
Fabio Labella
@SystemFw
@gannicottb your first code is equivalent to the noneTerminate combinator
onFinalize is perhaps more appropriate in this case
Brandon G
@gannicottb
haha, didn't know .noneTerminate existed. the onFinalize approach at least doesn't leave the queue hanging if something happens to the stream, might go that way for now
Ion Alberdi
@yetanotherion

mergeHaltR and mergeHaltL aren't really giving you a guarantee about from which side the last element is coming from they are only telling you which side is going to trigger interruption

@SystemFw thank you, do you think it would be worth mentioning somewhere in the code / doc? I'd be glad to give it a try (if it would be welcome)

Fabio Labella
@SystemFw
why not, clarifying scaladoc is always useful
Paulius Imbrasas
@CremboC
is there something like scan, but the output type is not the same as the state? i.e.
def scan2[O2, O3](z: O2)(f: (O2, O) => (O2, O3)): Stream[F, O3]
Ion Alberdi
@yetanotherion

why not, clarifying scaladoc is always useful

functional-streams-for-scala/fs2#1837

Fabio Labella
@SystemFw
@CremboC isn't that basically mapAccumulate?
Paulius Imbrasas
@CremboC
yeah, it is, just need to do a map afterwards, but good enough :P
mapAccumulate returns Stream[F, (O2, O3)]
otto-dev
@otto-dev
Hi guys. Is there a way how I can apply a stateful mapping function, such as (S, A) => Option[(S, B)] to each element, where S is some state that is carried along for each element, based on the previous result, while B is the element to be emitted? Similar to scanChunksOpt, but applying to elements, not chunks
otto-dev
@otto-dev
@CremboC Hey, that's exactly my question as well, just in different words! mapAccumulate is a good start, but did you find any other approach?
Fabio Labella
@SystemFw
I would go with mapAccumulate(..).unNone.map(_._2)
but I'm not sure what the meaning of Option is in your example
otto-dev
@otto-dev
Thanks for your tip. The meaning of None is end-of-stream
But that's not the focus of my question anyway. It's just the signature that I'm familiar with. Primary question was how to map statefully
otto-dev
@otto-dev
mapAccumulate on Chunk seems to have the signature @CremboC and I are looking for
Gavin Bisesi
@Daenyth
Is A != B?
unfold or unfoldEval may be part of what you want
you could also have a stream with (non-concurrent) effects in StateT maybe
otto-dev
@otto-dev
Yeah it's like unfold, but on a stream
Gavin Bisesi
@Daenyth
Right, Stream's companion object has such methods