Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Feb 01 10:11
    @SystemFw banned @Hudsone_gitlab
  • Jan 31 2019 04:19
    404- forked
    404-/fs2
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
Fabio Labella
@SystemFw
in the code you have, you could make it work with evalMap(identity) btw (rather than pure)
if you want a slightly more meaningful example
try writing a Int => StateT that takes an Int and inc/decr by that int
then you can do Stream(1, -1, 2, 1, -1).evalMap(yourFunction) and see
or, in case you want to follow the example in stack overflow, List.fill(5)(yourStateT).sequence becomes Stream.repeatEval(yourStateT).take(5)
(or equivalently Stream.eval(yourStateT).repeat.take(5)
Ukonn Ra
@UkonnRa
Well, my real-world situation is like CQRS/ES, I have a bunch of Events in the DB, and sometime I have to read them out (into a Stream) and "rehydrate" it to the latest state.
The only solution I can figure out is like https://scastie.scala-lang.org/6eIjulhSRfqJRotHerKNaA
Which should do runS over and over again
Adam Rosien
@arosien
https://github.com/sloshy/fs2-es/ has some good patterns for ES in fs2
to re-use or steal
Gavin Bisesi
@Daenyth
@UkonnRa ES events may also be easier without StateT unless you already have such functions. Note that (init state) => List[Event] => (final state) is the same signature as foldLeft more or less, so you can do eventStream.fold(init)(update).compile.lastOrError
Ukonn Ra
@UkonnRa
Yes, I thought it would be interesting if using StateT, but maybe it is not suitable in this situation
Gavin Bisesi
@Daenyth
IMO State monad is often overkill
I generally never start with a monad transformer, I write the raw version first and use transformers where I see the opportunity to factor out repeated structure
Ukonn Ra
@UkonnRa
It would be nice to use traverse function with StateT, but unfortunatelly, Stream have no
Gavin Bisesi
@Daenyth
Traverse's signature is the same as evalMap
*almost
return type differs
G[A] => (A => F[B]) => F[G[B]]
Stream[F, A] => (A => F[B]) => Stream[F, B]
Whatever function you'd pass to traverse, pass to evalMap instead
Perhaps more analogous to StateT's semiFlatMap
Ryan Peters
@sloshy
As @arosien mentioned (ty for the shout-out) the solution I usually end up using is something like EventState in fs2-es, which uses Ref instead of StateT. The library also has a (somewhat rudimentary) state cache based on when they were last used, which I find I usually end up wanting so I don't have to keep rehydrating my state.
I've tried putting StateT in streamy situations recently and I've come away with the general opinion to "just use Ref" all the time. Not that I'd never use StateT but for Stream in particular it poses some complexity issues.
Ukonn Ra
@UkonnRa
OK, I will try Ref, thanks a lot!
Ryan Peters
@sloshy
Here's the source of EventState which might point you in the right direction. It's a little obfuscated since I had to break it apart in a few places but the gist is I make a Ref with my initial state, I define a function to fold on my events with, and as events come in from an incoming stream (via hookup or one-at-a-time via doNext) it modifies the state. hydrated might be what you're looking for as it initializes the state given a stream of starting events you've provided for it.
James Cosford
@jamescosford

Hi! I'm running this quick and dirty app to send generated data to my server, which is marginally less quick and dirty:

//... some stuff omitted, probably not relevant to the issue

  def streamForDev(app: String, name: String, path: SimGPSPath, rate: FiniteDuration): Stream[F, Unit] = 
    Stream
      .unfoldEval[F, SimGPSState, SimGPSPoint](stateForPath(path))(stepFunc(rate))
      .metered(rate)
      .evalMap(o => toReport(app = app, dev = name, o))
      .evalMap(o => ReportSubmitter[F].reportSubmit(o)) // 
      .map(_ => ())

  def run: F[Unit] = Stream[F, Stream[F, Unit]](Paths.devices.map {
    case (app, dev, path) => streamForDev(app, dev, path, 2.seconds)
  } : _*).parJoinUnbounded.compile.drain
}

object Main extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = new Application[IO].run.map(_ => ExitCode.Success)
}

The issue is, when I run the app through sbt, and then ctrl+c to kill it, the server keeps receiving messages! Eventually, when I kill the server process this process dies because it can't connect anymore. How can I kill this one cleanly in response to a ctrl+c?

Michael Pilquist
@mpilquist
Try adding fork in run := true to build.sbt
The default behavior of run is to not fork a JVM and instead just call your main method. When you ctrl-c, SBT tries interrupting the main thread, which doesn't work well with IOApp
Add outputStrategy := Some(StdoutOutput) too
Some discussion here if interested more in why it's happening: https://gitter.im/typelevel/cats-effect?at=5dd2eb9ceeb63e1a83d878f3
James Cosford
@jamescosford
OK great I will try those, and read that too.
Thanks @mpilquist
Awesome, fork worked!
Ion Alberdi
@yetanotherion

Hello, we managed to reproduce a bug we have in production in the snippet below

import cats.effect._
import fs2._
import scala.concurrent.duration._

sealed trait LeftOrRight
case object Left extends LeftOrRight
case object Right extends LeftOrRight

object Hello extends IOApp {
  def run(args: List[String]): IO[ExitCode] = {
    val left: Stream[IO, LeftOrRight] = Stream.eval(IO.sleep(1.milli).map(_ => Left))
    val right: Stream[IO, LeftOrRight] = Stream.awakeEvery[IO](1.milli).map(_ => Right)
    val main: Stream[IO, Unit] = left.mergeHaltL(right).last.flatMap({
      case Some(Left) => Stream.emit(())
      case Some(Right) => Stream.raiseError[IO](new Exception("Stream ends with Right"))
      case None => Stream.raiseError[IO](new Exception("Unexpected empty stream"))
    })
    main.repeatN(10)
      .compile.drain
      .map(_ => ExitCode.Success)
  }
}

By executing it, it ends with "Stream ends with Right", meaning the last element from the stream comes from the right.
AFAIU, reading the implementation

def mergeHaltL[F2[x] >: F[x]: Concurrent, O2 >: O](that: Stream[F2, O2]): Stream[F2, O2] =
    noneTerminate.merge(that.map(Some(_))).unNoneTerminate

the behavior can be expected (the last element from the stream comes from the left indeed.
However this element is None, and an element from the right stream could have been pulled before which would trigger the behavior described above).
Do you think it's a bug?

Fabio Labella
@SystemFw
@yetanotherion I don't think it is
mergeHaltR and mergeHaltL aren't really giving you a guarantee about from which side the last element is coming from
they are only telling you which side is going to trigger interruption

However this element is None, and an element from the right stream could have been pulled before which would trigger the behavior described above

what I mean is that this statement is correct, and it describes the behaviour you see

Brandon G
@gannicottb

Is this still true?

Most concurrent queues in FS2 support tracking their size, which is handy for implementing size-based throttling of the producer.

Or to cut to the chase, what's the easiest way to enqueue a None to a NoneTerminatedQueue when the stream runs out of items?
Brandon G
@gannicottb
so far I've done this:
(stream.take(10).map(item => Some(item)) ++ Stream(None)).evalTap(e => q.enqueue1(e)).compile.drain
I was hoping that maybe I could skip the mapping and concatenation on the source stream, and do the Option wrapping/None passing in the evalTap
Brandon G
@gannicottb
is it appropriate to use onFinalize to enqueue the terminating None?
stream.take(10).evalTap(e => q.enqueue1(Some(e))).onFinalize(q.enqueue1(None)).compile.drain
Fabio Labella
@SystemFw
@gannicottb your first code is equivalent to the noneTerminate combinator
onFinalize is perhaps more appropriate in this case
Brandon G
@gannicottb
haha, didn't know .noneTerminate existed. the onFinalize approach at least doesn't leave the queue hanging if something happens to the stream, might go that way for now
Ion Alberdi
@yetanotherion

mergeHaltR and mergeHaltL aren't really giving you a guarantee about from which side the last element is coming from they are only telling you which side is going to trigger interruption

@SystemFw thank you, do you think it would be worth mentioning somewhere in the code / doc? I'd be glad to give it a try (if it would be welcome)

Fabio Labella
@SystemFw
why not, clarifying scaladoc is always useful