Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Feb 01 10:11
    @SystemFw banned @Hudsone_gitlab
  • Jan 31 2019 04:19
    404- forked
    404-/fs2
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
Brandon G
@gannicottb
haha, didn't know .noneTerminate existed. the onFinalize approach at least doesn't leave the queue hanging if something happens to the stream, might go that way for now
Ion Alberdi
@yetanotherion

mergeHaltR and mergeHaltL aren't really giving you a guarantee about from which side the last element is coming from they are only telling you which side is going to trigger interruption

@SystemFw thank you, do you think it would be worth mentioning somewhere in the code / doc? I'd be glad to give it a try (if it would be welcome)

Fabio Labella
@SystemFw
why not, clarifying scaladoc is always useful
Paulius Imbrasas
@CremboC
is there something like scan, but the output type is not the same as the state? i.e.
def scan2[O2, O3](z: O2)(f: (O2, O) => (O2, O3)): Stream[F, O3]
Ion Alberdi
@yetanotherion

why not, clarifying scaladoc is always useful

functional-streams-for-scala/fs2#1837

Fabio Labella
@SystemFw
@CremboC isn't that basically mapAccumulate?
Paulius Imbrasas
@CremboC
yeah, it is, just need to do a map afterwards, but good enough :P
mapAccumulate returns Stream[F, (O2, O3)]
otto-dev
@otto-dev
Hi guys. Is there a way how I can apply a stateful mapping function, such as (S, A) => Option[(S, B)] to each element, where S is some state that is carried along for each element, based on the previous result, while B is the element to be emitted? Similar to scanChunksOpt, but applying to elements, not chunks
otto-dev
@otto-dev
@CremboC Hey, that's exactly my question as well, just in different words! mapAccumulate is a good start, but did you find any other approach?
Fabio Labella
@SystemFw
I would go with mapAccumulate(..).unNone.map(_._2)
but I'm not sure what the meaning of Option is in your example
otto-dev
@otto-dev
Thanks for your tip. The meaning of None is end-of-stream
But that's not the focus of my question anyway. It's just the signature that I'm familiar with. Primary question was how to map statefully
otto-dev
@otto-dev
mapAccumulate on Chunk seems to have the signature @CremboC and I are looking for
Gavin Bisesi
@Daenyth
Is A != B?
unfold or unfoldEval may be part of what you want
you could also have a stream with (non-concurrent) effects in StateT maybe
otto-dev
@otto-dev
Yeah it's like unfold, but on a stream
Gavin Bisesi
@Daenyth
Right, Stream's companion object has such methods
otto-dev
@otto-dev
Yes, but that would mean that the actual stream instance becomes part of the state passed as zero into unfold, which becomes quite clumsy
Gavin Bisesi
@Daenyth
I don't follow
Do you mean that you have a stream of elements and you want to apply the state transformations over them?
otto-dev
@otto-dev
Yes, that's right
Gavin Bisesi
@Daenyth
rather than using unfold to produce the stream in the first place from a seed/
ok, makes sense
otto-dev
@otto-dev
You got me
Gavin Bisesi
@Daenyth
mapAccumulate + unNoneTerminate does sound close to right
otto-dev
@otto-dev
Thanks for your lead
I think I have got it now, digging the source code a bit using the IDE
Here is what I've got: Say, as an example, you want to calculate the Int difference between the current element compared to the last element, as a simple stateful example
def differenceToLast(last: Option[Int], current: Int): (Some[Int], Option[Int]) = (Some(current), last.map(current - _))
Then the most direct way I've found, without "unnecessary" element-wise tuple creation is this:
Gavin Bisesi
@Daenyth
Well, in that example I'd use zipWithNext or zipWithPrevious
and then map
otto-dev
@otto-dev
Stream(1, 4, 3, 4).mapChunks(c => c.mapAccumulate(None: Option[Int])(differenceToLast)._2)
Gavin Bisesi
@Daenyth
why on the chunks?
otto-dev
@otto-dev
The chunk signature is different. It only returns the result element, while on Streams it returns a tuple of state and result
Gavin Bisesi
@Daenyth
Right, because streams can be infinite, so each element is a "step", while Chunks are finite strict collections
you can .map(_._2) on the stream after accumulating
but if you only need pairwise I'd definitely zipWithNext/previous
otto-dev
@otto-dev
Maybe this is a case of over-thinking it, but I don't like the tuple creation just to destroy it right after. I wonder if that makes much of a performance difference, e.g. for numeric computations
What do you think? Premature optimization?
otto-dev
@otto-dev
Also, I have to use scanChunks instead of mapChunks, like so:
  def differenceToLast(last: Option[Int], current: Int): (Some[Int], Option[Int]) = (Some(current), last.map(current - _))
  Stream(1, 4, 3, 4).scanChunks(None: Option[Int])((acc, c) => c.mapAccumulate(acc)(differenceToLast))
@CremboC Maybe the above helps you as well. It's stolen from the mapAccumulate implementation in Stream
Gavin Bisesi
@Daenyth
super premature
Profile it if it's not good enoguh
Simple code first, reuse library methods before doing something more complex
otto-dev
@otto-dev
Yeah agreed