Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Jan 31 04:19
    404- forked
    404-/fs2
  • Jan 31 03:01
    SethTisue commented #1232
  • Jan 30 17:22
  • Jan 30 13:45
  • Jan 30 10:48
    pchlupacek commented #1406
  • Jan 30 10:47
    pchlupacek commented #1406
  • Jan 30 10:39
    pchlupacek commented #1407
  • Jan 30 09:58
    lJoublanc commented #870
  • Jan 30 09:42
    vladimir-popov commented #1407
  • Jan 30 08:10
    vladimir-popov closed #1407
  • Jan 30 08:10
    vladimir-popov commented #1407
  • Jan 29 19:20
    SystemFw commented #1407
  • Jan 29 19:20
    SystemFw commented #1407
  • Jan 29 18:57
    SystemFw commented #1406
  • Jan 29 17:47
    pchlupacek commented #1406
  • Jan 29 17:42
    pchlupacek commented #1406
  • Jan 29 17:39
    pchlupacek commented #1407
  • Jan 29 17:39
    vladimir-popov edited #1407
  • Jan 29 17:38
    vladimir-popov commented #1406
  • Jan 29 17:37
    pchlupacek commented #1406
Fabio Labella
@SystemFw
in that case you're better off with a simple flatMap
but in many cases, Stream allows you to separate the "when" from the "what", which isn't possible with IO
so I tend to have timed streams separated, and zip them with my logic stream
Dennis
@dennis4b_twitter
If you don't mind me asking (I am struggling a bit still with timed streams) -> suppose I have a SignallingRef[IO,Int], which is the number of outstanding jobs. On job completion, I execute .update(_ - 1). I want to push this new value to the user, which is easy by mapping the counter.discrete value. However, updates can come in quick succession, so after sending one update I want to not send the next update sooner than in one second, so possibilities are:
1) nothing comes, nothing to do
2) next comes at 0.3 seconds, next at 0.6, next at 0.8, and next at 3,5 seconds -> I would like to send the next value (that was set at 0,8 seconds) to the user at 1,0 seconds, and because the next update at 3,5 was more than 1 second send it immediately when receiving it (but then again not sending any more updates until 4,5)
I hope this makes sense :-) It's a bit like debouncing I guess
Fabio Labella
@SystemFw
there is a debounce combinator I think
is that enough?
Dennis
@dennis4b_twitter
wow, actually yes
a last one: I have a Stream[IO,Message]. I want to make this into a Stream[IO,Option[Message]] with the following semantics: if there is a was no Message coming from the original stream within the last 10 seconds, emit a None. (My usecase: inject keepalives into a stream of websocket messages to the client, but instead of at some fixed interval do it only if I haven't sent any legitimate message within the last 10 seconds)
Dennis
@dennis4b_twitter
I currently merge with an interval stream, but that way I send the keepalives suboptimally because I don't need to send them if I am already sending other traffic
Fabio Labella
@SystemFw
yeah once you get into a situation when you need finer control, the strategy is generally to have an explicit queue
this particular case you have won't necessarily be super easy to implement
you can have a look at the implementation of groupWithin to take some inspiration
if not I can probably write it down for you at some point
the main difficulty is that you have a timeout that can timeout itself
i.e. when the stream emits, you start a 10 second timeout
but if it does emit, you need to timeout the timeout
Dennis
@dennis4b_twitter
a Queue would work if there was something like timedDequeue(1.seconds) that would return Some(_) or None on timeout
Fabio Labella
@SystemFw
I've implemented this logic in several ways for different use cases
i.e. once in the cats effect channel, (for a similar use case), using one strategy
and once in groupWithin (with a different strategy)
(and several other times in other places)
if you look at groupWithin you can see how it's done
Dennis
@dennis4b_twitter
looking now
Fabio Labella
@SystemFw
let me send you a few links
1) and 3) are fairly similar
using a SignallingRef could be another way (you sample it at a given rate and check if the latest emitted element has changed)
Fabio Labella
@SystemFw
yeah, there are many ways to do this
let me know if you make progress or get stuck
Dennis
@dennis4b_twitter
What would be the efficiency of something like Alarm for every read/write? In any case, in the websocket stream case the exact timeout is not so important so you can merge with a 1 second interval stream and then count upwards, emitting a keepalive on 10, or resetting on a message
Fabio Labella
@SystemFw
in all three approaches I would do it on chunks, not individual elements, if possible
possibly the approach the doesn't use alarm is more efficient
but to be quite honest I don't know, you'll have to measure it
Dennis
@dennis4b_twitter
If I have for example 100 such streams (100 clients), are there efficiencies in merging these with a single interval stream (if that is even possible)? it'd be nice not to wake up 100 keepalives at the exact same time, but not sure of the overhead of each additional interval stream
Fabio Labella
@SystemFw
I can help you with the queue based approach
I think that's a separate problem which should be dealt with separately
i.e. solve it for one client, then worry about possibly skewing them
I'd go with the queue approach though, it will give you more control
you need a Queue and Ref[F, Token]
Dennis
@dennis4b_twitter
I will look into it a bit later, finishing some other parts first (the websockets do work atm, just could be better, but have lots of old actor code that's becoming magical streaming code)
Dennis
@dennis4b_twitter
Another example: I have a list of 100 PDF documents to generate and merge. Let's say a List[IO[PdfDocument]]. Using parSequence I efficiently get the resulting IO[List[PdfDocument]]. However, afterwards I want to merge these into 1 document, in the same order. Using parSequence I can not start merging until all 100 documents are done. So I would need something like a stream of generated documents, that are generated in parallel (like parJoin(n)), but whose output is still in the original order. So, if document "2" takes very long generate, it will have emitted "1", internally generated already "3", "4" and "5", but only when "2" is ready are these emitted.
That should be the most efficient pipeline.
Fabio Labella
@SystemFw
you can try with mapAsync(...).fold
I think that should do exactly what you want
Fabio Labella
@SystemFw
you could actually be even more efficient I guess, if you start merging 3 4 and 5 as they come. That's harder to implement though
it's a scatter-gather pattern
but Stream can't help you with that
Dennis
@dennis4b_twitter
technically the PDF format allows that but then I have to dig a lot deeper than the pdfbox merging code, if I can just do it like above it'll be a massive improvement over the current code
Fabio Labella
@SystemFw
however , mapAsync(..).fold is already something
yeah