Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Feb 01 10:11
    @SystemFw banned @Hudsone_gitlab
  • Jan 31 2019 04:19
    404- forked
    404-/fs2
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
otto-dev
@otto-dev
mapAccumulate on Chunk seems to have the signature @CremboC and I are looking for
Gavin Bisesi
@Daenyth
Is A != B?
unfold or unfoldEval may be part of what you want
you could also have a stream with (non-concurrent) effects in StateT maybe
otto-dev
@otto-dev
Yeah it's like unfold, but on a stream
Gavin Bisesi
@Daenyth
Right, Stream's companion object has such methods
otto-dev
@otto-dev
Yes, but that would mean that the actual stream instance becomes part of the state passed as zero into unfold, which becomes quite clumsy
Gavin Bisesi
@Daenyth
I don't follow
Do you mean that you have a stream of elements and you want to apply the state transformations over them?
otto-dev
@otto-dev
Yes, that's right
Gavin Bisesi
@Daenyth
rather than using unfold to produce the stream in the first place from a seed/
ok, makes sense
otto-dev
@otto-dev
You got me
Gavin Bisesi
@Daenyth
mapAccumulate + unNoneTerminate does sound close to right
otto-dev
@otto-dev
Thanks for your lead
I think I have got it now, digging the source code a bit using the IDE
Here is what I've got: Say, as an example, you want to calculate the Int difference between the current element compared to the last element, as a simple stateful example
def differenceToLast(last: Option[Int], current: Int): (Some[Int], Option[Int]) = (Some(current), last.map(current - _))
Then the most direct way I've found, without "unnecessary" element-wise tuple creation is this:
Gavin Bisesi
@Daenyth
Well, in that example I'd use zipWithNext or zipWithPrevious
and then map
otto-dev
@otto-dev
Stream(1, 4, 3, 4).mapChunks(c => c.mapAccumulate(None: Option[Int])(differenceToLast)._2)
Gavin Bisesi
@Daenyth
why on the chunks?
otto-dev
@otto-dev
The chunk signature is different. It only returns the result element, while on Streams it returns a tuple of state and result
Gavin Bisesi
@Daenyth
Right, because streams can be infinite, so each element is a "step", while Chunks are finite strict collections
you can .map(_._2) on the stream after accumulating
but if you only need pairwise I'd definitely zipWithNext/previous
otto-dev
@otto-dev
Maybe this is a case of over-thinking it, but I don't like the tuple creation just to destroy it right after. I wonder if that makes much of a performance difference, e.g. for numeric computations
What do you think? Premature optimization?
otto-dev
@otto-dev
Also, I have to use scanChunks instead of mapChunks, like so:
  def differenceToLast(last: Option[Int], current: Int): (Some[Int], Option[Int]) = (Some(current), last.map(current - _))
  Stream(1, 4, 3, 4).scanChunks(None: Option[Int])((acc, c) => c.mapAccumulate(acc)(differenceToLast))
@CremboC Maybe the above helps you as well. It's stolen from the mapAccumulate implementation in Stream
Gavin Bisesi
@Daenyth
super premature
Profile it if it's not good enoguh
Simple code first, reuse library methods before doing something more complex
otto-dev
@otto-dev
Yeah agreed
otto-dev
@otto-dev
  // takes (S, A) => (S, B) to create Stream of B
  def statefulMap[F[_], S, A, B](s: Stream[F, A])(init: S)(f: (S, A) => (S, B)): Stream[F, B] = {
    s.scanChunks(init)((acc, c) => c.mapAccumulate(acc)(f))
  }
Fabio Labella
@SystemFw
I stand by my recommendation of using mapAccumulate
also, a lot of combinators are already optimised for chunkiness under the hood
Gavin Bisesi
@Daenyth
What happens to your version if I give you a stream where I did s.chunkN(1).flatMap(Stream.chunk)
I think if you're using scanChunks without specifying a chunkiness yourself, you're going to be real prone to misbehavior
otto-dev
@otto-dev
Should be exactly the same as mapAccumulate except for the return type @Daenyth https://github.com/functional-streams-for-scala/fs2/blob/master/core/shared/src/main/scala/fs2/Stream.scala#L1782-L1788
Gavin Bisesi
@Daenyth
ah
semenodm
@semenodm
Hello community, hope everyone doing good these tough days. I have a question, what would be the best way to express following behavior with fs2: i need accumulate messages in memory for a period of time when there is some activity in the stream, and do some effectful action upon buffered messages once no new messages detected within given period of time, say 10 minutes, clean up the buffer and continue doing this
Ryan Peters
@sloshy
I'm not sure if anything has been added recently, but you might want to look into chrisdavenport/agitation
It allows you to have "resettable timers". You can enqueue things in memory using a cats.effect.concurrent.Ref or fs2.concurrent.Queue. As elements come in, you reset the timer, and then when the timer expires you can trigger flushing it.
Ryan Peters
@sloshy
New issue functional-streams-for-scala/fs2#1838 as discussed on the cats effect gitter (thanks @Daenyth)
Ryan Peters
@sloshy
(heck, thanks for the super speedy response @mpilquist)
Normally I have to wait a week or five on some projects for those
Michael Pilquist
@mpilquist
:) happened to need a small OSS break at the end of a long work day