Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Feb 01 10:11
    @SystemFw banned @Hudsone_gitlab
  • Jan 31 2019 04:19
    404- forked
    404-/fs2
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
Dennis
@dennis4b_twitter
looking now
Fabio Labella
@SystemFw
let me send you a few links
1) and 3) are fairly similar
using a SignallingRef could be another way (you sample it at a given rate and check if the latest emitted element has changed)
Fabio Labella
@SystemFw
yeah, there are many ways to do this
let me know if you make progress or get stuck
Dennis
@dennis4b_twitter
What would be the efficiency of something like Alarm for every read/write? In any case, in the websocket stream case the exact timeout is not so important so you can merge with a 1 second interval stream and then count upwards, emitting a keepalive on 10, or resetting on a message
Fabio Labella
@SystemFw
in all three approaches I would do it on chunks, not individual elements, if possible
possibly the approach the doesn't use alarm is more efficient
but to be quite honest I don't know, you'll have to measure it
Dennis
@dennis4b_twitter
If I have for example 100 such streams (100 clients), are there efficiencies in merging these with a single interval stream (if that is even possible)? it'd be nice not to wake up 100 keepalives at the exact same time, but not sure of the overhead of each additional interval stream
Fabio Labella
@SystemFw
I can help you with the queue based approach
I think that's a separate problem which should be dealt with separately
i.e. solve it for one client, then worry about possibly skewing them
I'd go with the queue approach though, it will give you more control
you need a Queue and Ref[F, Token]
Dennis
@dennis4b_twitter
I will look into it a bit later, finishing some other parts first (the websockets do work atm, just could be better, but have lots of old actor code that's becoming magical streaming code)
Dennis
@dennis4b_twitter
Another example: I have a list of 100 PDF documents to generate and merge. Let's say a List[IO[PdfDocument]]. Using parSequence I efficiently get the resulting IO[List[PdfDocument]]. However, afterwards I want to merge these into 1 document, in the same order. Using parSequence I can not start merging until all 100 documents are done. So I would need something like a stream of generated documents, that are generated in parallel (like parJoin(n)), but whose output is still in the original order. So, if document "2" takes very long generate, it will have emitted "1", internally generated already "3", "4" and "5", but only when "2" is ready are these emitted.
That should be the most efficient pipeline.
Fabio Labella
@SystemFw
you can try with mapAsync(...).fold
I think that should do exactly what you want
Fabio Labella
@SystemFw
you could actually be even more efficient I guess, if you start merging 3 4 and 5 as they come. That's harder to implement though
it's a scatter-gather pattern
but Stream can't help you with that
Dennis
@dennis4b_twitter
technically the PDF format allows that but then I have to dig a lot deeper than the pdfbox merging code, if I can just do it like above it'll be a massive improvement over the current code
Fabio Labella
@SystemFw
however , mapAsync(..).fold is already something
yeah
Dennis
@dennis4b_twitter
but I'm not really sure how to use mapAsync, can't find a lot of info on it
Fabio Labella
@SystemFw
I'd go with that then
just pass it the function that renders
Stream.emits(unrenderedPdfList).mapAsync(render)(10).fold...
Dennis
@dennis4b_twitter
ow was stuck on IO.mapAsync, sorry
Fabio Labella
@SystemFw
runs 10 concurrently, but preserves the order and starts folding as soon as the elements arrive

technically the PDF format allows that but then I have to dig a lot deeper than the pdfbox merging code

Tbh it would be enough to know that the merging is monoidal (associative, in particular), but I agree that the solution above is way simpler and already a substantial improvement

Dennis
@dennis4b_twitter
ah like that you mean, well, I guess it is. Given 2 documents you can add one to the other, it'd be more a question of how efficient this is compared to linearly writing the result
writing in order you can write to a file and have a low memory profile
what would it look like scatter-gather style?
(i initially meant that you could have a PDF file where pages 4-6 are defined at the beginning, pages 1-2 at the end, and 3 in the middle, for example. But yes you can also merge adjacent docs until you end up with 1 final doc)
Fabio Labella
@SystemFw

what would it look like scatter-gather style

You have List, you keep bisecting and recursively run the function on both sides, in parallel

i initially meant that you could have a PDF file where pages 4-6 are defined at the beginning, pages 1-2 at the end, and 3 in the middle, f

If that's the case you need to have all the parts already there, for reordering, i.e. you do need parTraverse

Dennis
@dennis4b_twitter
def render(i: Int) = s"doc-${i}" 
val jobs = List( IO.pure(1), IO.pure(2), IO.pure(3) )
Stream.emits(jobs).mapAsync(10)(render)

gives:
type mismatch;
[error]  found   : String
[error]  required: fs2.Pure[?]
[error]                         Stream.emits(jobs).mapAsync(10)(render)
sorry, I haven't tried this before
Dennis
@dennis4b_twitter
ok need a covary[IO] in there to compile
Fabio Labella
@SystemFw
mm no, wait
with mapAsync, your List if of Int
not of IO
Stream(1,2,3).mapAsync(10)(render)
Dennis
@dennis4b_twitter
yeah, but I started with a List[IO[PdfDocument]], but I can more the IO creation to the mapAsync and now I think I'm almost there
Fabio Labella
@SystemFw
yeah, you need to start with List[PdfDocument]
or emits.mapAsync(identity)
Dennis
@dennis4b_twitter
Incredible, one screen of code and it works great with all the cpu intensive stuff nicely parallelized.