Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
  • Feb 01 2021 10:11
    @SystemFw banned @Hudsone_gitlab
  • Jan 31 2019 04:19
    404- forked
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
Ukonn Ra
Well, let me try evalMap first, maybe I misunderstood something
Fabio Labella
sure, feel free to post a scastie or a snippet if you're struggling further :)
Ukonn Ra
Well just like this piece of code, https://scastie.scala-lang.org/Nuz2i83yTnWs0IPtOUhPBg, I just have no idea what the evalMap is... Any advice?
Gavin Bisesi
That evalMap does nothing useful
takes an element, wraps it in F[_], then evaluates the F to get the element back
All your StateT are using pure so it might be easier to skip the T and just use State
Fabio Labella
@UkonnRa I thought you had a stream of elements, that you are operating on with StateT
if you just have three stateT computations, you can Stream.eval(first) ++ Stream.eval(second) ++ Stream.eval(third) but then why are you using Stream? Could just have first >> second >> third
in the code you have, you could make it work with evalMap(identity) btw (rather than pure)
if you want a slightly more meaningful example
try writing a Int => StateT that takes an Int and inc/decr by that int
then you can do Stream(1, -1, 2, 1, -1).evalMap(yourFunction) and see
or, in case you want to follow the example in stack overflow, List.fill(5)(yourStateT).sequence becomes Stream.repeatEval(yourStateT).take(5)
(or equivalently Stream.eval(yourStateT).repeat.take(5)
Ukonn Ra
Well, my real-world situation is like CQRS/ES, I have a bunch of Events in the DB, and sometime I have to read them out (into a Stream) and "rehydrate" it to the latest state.
The only solution I can figure out is like https://scastie.scala-lang.org/6eIjulhSRfqJRotHerKNaA
Which should do runS over and over again
Adam Rosien
https://github.com/sloshy/fs2-es/ has some good patterns for ES in fs2
to re-use or steal
Gavin Bisesi
@UkonnRa ES events may also be easier without StateT unless you already have such functions. Note that (init state) => List[Event] => (final state) is the same signature as foldLeft more or less, so you can do eventStream.fold(init)(update).compile.lastOrError
Ukonn Ra
Yes, I thought it would be interesting if using StateT, but maybe it is not suitable in this situation
Gavin Bisesi
IMO State monad is often overkill
I generally never start with a monad transformer, I write the raw version first and use transformers where I see the opportunity to factor out repeated structure
Ukonn Ra
It would be nice to use traverse function with StateT, but unfortunatelly, Stream have no
Gavin Bisesi
Traverse's signature is the same as evalMap
return type differs
G[A] => (A => F[B]) => F[G[B]]
Stream[F, A] => (A => F[B]) => Stream[F, B]
Whatever function you'd pass to traverse, pass to evalMap instead
Perhaps more analogous to StateT's semiFlatMap
Ryan Peters
As @arosien mentioned (ty for the shout-out) the solution I usually end up using is something like EventState in fs2-es, which uses Ref instead of StateT. The library also has a (somewhat rudimentary) state cache based on when they were last used, which I find I usually end up wanting so I don't have to keep rehydrating my state.
I've tried putting StateT in streamy situations recently and I've come away with the general opinion to "just use Ref" all the time. Not that I'd never use StateT but for Stream in particular it poses some complexity issues.
Ukonn Ra
OK, I will try Ref, thanks a lot!
Ryan Peters
Here's the source of EventState which might point you in the right direction. It's a little obfuscated since I had to break it apart in a few places but the gist is I make a Ref with my initial state, I define a function to fold on my events with, and as events come in from an incoming stream (via hookup or one-at-a-time via doNext) it modifies the state. hydrated might be what you're looking for as it initializes the state given a stream of starting events you've provided for it.
James Cosford

Hi! I'm running this quick and dirty app to send generated data to my server, which is marginally less quick and dirty:

//... some stuff omitted, probably not relevant to the issue

  def streamForDev(app: String, name: String, path: SimGPSPath, rate: FiniteDuration): Stream[F, Unit] = 
      .unfoldEval[F, SimGPSState, SimGPSPoint](stateForPath(path))(stepFunc(rate))
      .evalMap(o => toReport(app = app, dev = name, o))
      .evalMap(o => ReportSubmitter[F].reportSubmit(o)) // 
      .map(_ => ())

  def run: F[Unit] = Stream[F, Stream[F, Unit]](Paths.devices.map {
    case (app, dev, path) => streamForDev(app, dev, path, 2.seconds)
  } : _*).parJoinUnbounded.compile.drain

object Main extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = new Application[IO].run.map(_ => ExitCode.Success)

The issue is, when I run the app through sbt, and then ctrl+c to kill it, the server keeps receiving messages! Eventually, when I kill the server process this process dies because it can't connect anymore. How can I kill this one cleanly in response to a ctrl+c?

Michael Pilquist
Try adding fork in run := true to build.sbt
The default behavior of run is to not fork a JVM and instead just call your main method. When you ctrl-c, SBT tries interrupting the main thread, which doesn't work well with IOApp
Add outputStrategy := Some(StdoutOutput) too
Some discussion here if interested more in why it's happening: https://gitter.im/typelevel/cats-effect?at=5dd2eb9ceeb63e1a83d878f3
James Cosford
OK great I will try those, and read that too.
Thanks @mpilquist
Awesome, fork worked!
Ion Alberdi

Hello, we managed to reproduce a bug we have in production in the snippet below

import cats.effect._
import fs2._
import scala.concurrent.duration._

sealed trait LeftOrRight
case object Left extends LeftOrRight
case object Right extends LeftOrRight

object Hello extends IOApp {
  def run(args: List[String]): IO[ExitCode] = {
    val left: Stream[IO, LeftOrRight] = Stream.eval(IO.sleep(1.milli).map(_ => Left))
    val right: Stream[IO, LeftOrRight] = Stream.awakeEvery[IO](1.milli).map(_ => Right)
    val main: Stream[IO, Unit] = left.mergeHaltL(right).last.flatMap({
      case Some(Left) => Stream.emit(())
      case Some(Right) => Stream.raiseError[IO](new Exception("Stream ends with Right"))
      case None => Stream.raiseError[IO](new Exception("Unexpected empty stream"))
      .map(_ => ExitCode.Success)

By executing it, it ends with "Stream ends with Right", meaning the last element from the stream comes from the right.
AFAIU, reading the implementation

def mergeHaltL[F2[x] >: F[x]: Concurrent, O2 >: O](that: Stream[F2, O2]): Stream[F2, O2] =

the behavior can be expected (the last element from the stream comes from the left indeed.
However this element is None, and an element from the right stream could have been pulled before which would trigger the behavior described above).
Do you think it's a bug?

Fabio Labella
@yetanotherion I don't think it is
mergeHaltR and mergeHaltL aren't really giving you a guarantee about from which side the last element is coming from
they are only telling you which side is going to trigger interruption

However this element is None, and an element from the right stream could have been pulled before which would trigger the behavior described above

what I mean is that this statement is correct, and it describes the behaviour you see

Brandon G

Is this still true?

Most concurrent queues in FS2 support tracking their size, which is handy for implementing size-based throttling of the producer.

Or to cut to the chase, what's the easiest way to enqueue a None to a NoneTerminatedQueue when the stream runs out of items?