Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Feb 01 10:11
    @SystemFw banned @Hudsone_gitlab
  • Jan 31 2019 04:19
    404- forked
    404-/fs2
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
Gavin Bisesi
@Daenyth
mapAccumulate + unNoneTerminate does sound close to right
otto-dev
@otto-dev
Thanks for your lead
I think I have got it now, digging the source code a bit using the IDE
Here is what I've got: Say, as an example, you want to calculate the Int difference between the current element compared to the last element, as a simple stateful example
def differenceToLast(last: Option[Int], current: Int): (Some[Int], Option[Int]) = (Some(current), last.map(current - _))
Then the most direct way I've found, without "unnecessary" element-wise tuple creation is this:
Gavin Bisesi
@Daenyth
Well, in that example I'd use zipWithNext or zipWithPrevious
and then map
otto-dev
@otto-dev
Stream(1, 4, 3, 4).mapChunks(c => c.mapAccumulate(None: Option[Int])(differenceToLast)._2)
Gavin Bisesi
@Daenyth
why on the chunks?
otto-dev
@otto-dev
The chunk signature is different. It only returns the result element, while on Streams it returns a tuple of state and result
Gavin Bisesi
@Daenyth
Right, because streams can be infinite, so each element is a "step", while Chunks are finite strict collections
you can .map(_._2) on the stream after accumulating
but if you only need pairwise I'd definitely zipWithNext/previous
otto-dev
@otto-dev
Maybe this is a case of over-thinking it, but I don't like the tuple creation just to destroy it right after. I wonder if that makes much of a performance difference, e.g. for numeric computations
What do you think? Premature optimization?
otto-dev
@otto-dev
Also, I have to use scanChunks instead of mapChunks, like so:
  def differenceToLast(last: Option[Int], current: Int): (Some[Int], Option[Int]) = (Some(current), last.map(current - _))
  Stream(1, 4, 3, 4).scanChunks(None: Option[Int])((acc, c) => c.mapAccumulate(acc)(differenceToLast))
@CremboC Maybe the above helps you as well. It's stolen from the mapAccumulate implementation in Stream
Gavin Bisesi
@Daenyth
super premature
Profile it if it's not good enoguh
Simple code first, reuse library methods before doing something more complex
otto-dev
@otto-dev
Yeah agreed
otto-dev
@otto-dev
  // takes (S, A) => (S, B) to create Stream of B
  def statefulMap[F[_], S, A, B](s: Stream[F, A])(init: S)(f: (S, A) => (S, B)): Stream[F, B] = {
    s.scanChunks(init)((acc, c) => c.mapAccumulate(acc)(f))
  }
Fabio Labella
@SystemFw
I stand by my recommendation of using mapAccumulate
also, a lot of combinators are already optimised for chunkiness under the hood
Gavin Bisesi
@Daenyth
What happens to your version if I give you a stream where I did s.chunkN(1).flatMap(Stream.chunk)
I think if you're using scanChunks without specifying a chunkiness yourself, you're going to be real prone to misbehavior
otto-dev
@otto-dev
Should be exactly the same as mapAccumulate except for the return type @Daenyth https://github.com/functional-streams-for-scala/fs2/blob/master/core/shared/src/main/scala/fs2/Stream.scala#L1782-L1788
Gavin Bisesi
@Daenyth
ah
semenodm
@semenodm
Hello community, hope everyone doing good these tough days. I have a question, what would be the best way to express following behavior with fs2: i need accumulate messages in memory for a period of time when there is some activity in the stream, and do some effectful action upon buffered messages once no new messages detected within given period of time, say 10 minutes, clean up the buffer and continue doing this
Ryan Peters
@sloshy
I'm not sure if anything has been added recently, but you might want to look into chrisdavenport/agitation
It allows you to have "resettable timers". You can enqueue things in memory using a cats.effect.concurrent.Ref or fs2.concurrent.Queue. As elements come in, you reset the timer, and then when the timer expires you can trigger flushing it.
Ryan Peters
@sloshy
New issue functional-streams-for-scala/fs2#1838 as discussed on the cats effect gitter (thanks @Daenyth)
Ryan Peters
@sloshy
(heck, thanks for the super speedy response @mpilquist)
Normally I have to wait a week or five on some projects for those
Michael Pilquist
@mpilquist
:) happened to need a small OSS break at the end of a long work day
Ryan Peters
@sloshy
You're all doing great, keep up the good work btw
Hope everything is good in these covid-19 times
Michael Pilquist
@mpilquist
Likewise :heart:
Ryan Peters
@sloshy
RE: #1838 would it be possible, or even make sense, for there to be some type that is equivalent to Pure but does not have an implicit compiler? So it could be used in pipes, or aliased as something like type PurePipe[A, B] = Pipe[PureButNotCompilableOrSomething, A, B] including the implicit covary as before
completely forgot there was an fs2-dev channel, should probably have asked this there.
Michael Pilquist
@mpilquist
That assumes compilation is the only contravariant use case though
note you can use F = Nothing for that too, though inference might break more often
Michael Pilquist
@mpilquist
@sloshy Here's such an example (started on this yesterday but got pulled away):
def makeEverythingIO[A]: Pipe[Pure, A, Stream[IO, A]] = in => Stream(in.covary[IO])
matfournier
@matfournier
If I need to restart a Stream (say, reconnect to SQS, who's connection I'm wrapping and streaming from) on a certain failure, what's the best way to do this? I keep looking at bracket, but it's not so much that I want to release something more that I want to reacquire it. I'm probably overcomplicating this
matfournier
@matfournier
This is probably a more general cats-effect question (so I've asked it there), I actually have no idea how to do resource re-acquisition using resource/bracket, only how to do cleanup. Time to do some sleuthing!
ybasket
@ybasket
@matfournier Sounds like just handling the error might be enough, you can find an example of this in https://github.com/profunktor/fs2-rabbit/blob/master/core/src/main/scala/dev/profunktor/fs2rabbit/resiliency/ResilientStream.scala
fs2 has HotSwap for chains of resources, but not sure how the wiring would look like for your case though. See functional-streams-for-scala/fs2#1667
matfournier
@matfournier
Thanks!