Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Feb 01 10:11
    @SystemFw banned @Hudsone_gitlab
  • Jan 31 2019 04:19
    404- forked
    404-/fs2
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
Dennis
@dennis4b_twitter
wow, actually yes
a last one: I have a Stream[IO,Message]. I want to make this into a Stream[IO,Option[Message]] with the following semantics: if there is a was no Message coming from the original stream within the last 10 seconds, emit a None. (My usecase: inject keepalives into a stream of websocket messages to the client, but instead of at some fixed interval do it only if I haven't sent any legitimate message within the last 10 seconds)
Dennis
@dennis4b_twitter
I currently merge with an interval stream, but that way I send the keepalives suboptimally because I don't need to send them if I am already sending other traffic
Fabio Labella
@SystemFw
yeah once you get into a situation when you need finer control, the strategy is generally to have an explicit queue
this particular case you have won't necessarily be super easy to implement
you can have a look at the implementation of groupWithin to take some inspiration
if not I can probably write it down for you at some point
the main difficulty is that you have a timeout that can timeout itself
i.e. when the stream emits, you start a 10 second timeout
but if it does emit, you need to timeout the timeout
Dennis
@dennis4b_twitter
a Queue would work if there was something like timedDequeue(1.seconds) that would return Some(_) or None on timeout
Fabio Labella
@SystemFw
I've implemented this logic in several ways for different use cases
i.e. once in the cats effect channel, (for a similar use case), using one strategy
and once in groupWithin (with a different strategy)
(and several other times in other places)
if you look at groupWithin you can see how it's done
Dennis
@dennis4b_twitter
looking now
Fabio Labella
@SystemFw
let me send you a few links
1) and 3) are fairly similar
using a SignallingRef could be another way (you sample it at a given rate and check if the latest emitted element has changed)
Fabio Labella
@SystemFw
yeah, there are many ways to do this
let me know if you make progress or get stuck
Dennis
@dennis4b_twitter
What would be the efficiency of something like Alarm for every read/write? In any case, in the websocket stream case the exact timeout is not so important so you can merge with a 1 second interval stream and then count upwards, emitting a keepalive on 10, or resetting on a message
Fabio Labella
@SystemFw
in all three approaches I would do it on chunks, not individual elements, if possible
possibly the approach the doesn't use alarm is more efficient
but to be quite honest I don't know, you'll have to measure it
Dennis
@dennis4b_twitter
If I have for example 100 such streams (100 clients), are there efficiencies in merging these with a single interval stream (if that is even possible)? it'd be nice not to wake up 100 keepalives at the exact same time, but not sure of the overhead of each additional interval stream
Fabio Labella
@SystemFw
I can help you with the queue based approach
I think that's a separate problem which should be dealt with separately
i.e. solve it for one client, then worry about possibly skewing them
I'd go with the queue approach though, it will give you more control
you need a Queue and Ref[F, Token]
Dennis
@dennis4b_twitter
I will look into it a bit later, finishing some other parts first (the websockets do work atm, just could be better, but have lots of old actor code that's becoming magical streaming code)
Dennis
@dennis4b_twitter
Another example: I have a list of 100 PDF documents to generate and merge. Let's say a List[IO[PdfDocument]]. Using parSequence I efficiently get the resulting IO[List[PdfDocument]]. However, afterwards I want to merge these into 1 document, in the same order. Using parSequence I can not start merging until all 100 documents are done. So I would need something like a stream of generated documents, that are generated in parallel (like parJoin(n)), but whose output is still in the original order. So, if document "2" takes very long generate, it will have emitted "1", internally generated already "3", "4" and "5", but only when "2" is ready are these emitted.
That should be the most efficient pipeline.
Fabio Labella
@SystemFw
you can try with mapAsync(...).fold
I think that should do exactly what you want
Fabio Labella
@SystemFw
you could actually be even more efficient I guess, if you start merging 3 4 and 5 as they come. That's harder to implement though
it's a scatter-gather pattern
but Stream can't help you with that
Dennis
@dennis4b_twitter
technically the PDF format allows that but then I have to dig a lot deeper than the pdfbox merging code, if I can just do it like above it'll be a massive improvement over the current code
Fabio Labella
@SystemFw
however , mapAsync(..).fold is already something
yeah
Dennis
@dennis4b_twitter
but I'm not really sure how to use mapAsync, can't find a lot of info on it
Fabio Labella
@SystemFw
I'd go with that then
just pass it the function that renders
Stream.emits(unrenderedPdfList).mapAsync(render)(10).fold...
Dennis
@dennis4b_twitter
ow was stuck on IO.mapAsync, sorry
Fabio Labella
@SystemFw
runs 10 concurrently, but preserves the order and starts folding as soon as the elements arrive

technically the PDF format allows that but then I have to dig a lot deeper than the pdfbox merging code

Tbh it would be enough to know that the merging is monoidal (associative, in particular), but I agree that the solution above is way simpler and already a substantial improvement