These are chat archives for akkadotnet/AkkaStreams

27th
Sep 2018
AndreSteenbergen
@AndreSteenbergen
Sep 27 2018 11:48
Is it possible to zip 3 streams; with the caveat the order of 1 field remains ordered. I mean there is a property: DateTime Raised in the stream message. Is there a Zip stage which is smart enough to send through 1 of the 3 message, the one with lowest datetime?
Sorry not Zip, a Merge of streams, zip does something different
Marc Piechura
@marcpiechura
Sep 27 2018 12:08
Ootb we only support MergePreferred and MergePrioritized if those doesn’t fit you probably have to write a custom GraphStage
But wouldn’t zip work for your case ? Since it only emits if all 3 have an element and you can then select the one with the lowest datetime ?
AndreSteenbergen
@AndreSteenbergen
Sep 27 2018 12:12
What to do with the other two values? The "winning" partition might have a next lower then the other two as well. And the streams aren't balanced... Custom stage it will be I guess.
I can work from merge preferred and go from there
I guess
Marc Piechura
@marcpiechura
Sep 27 2018 12:13
I thought you only want to have one of three
If your plan is merge three, then order the three messages based on datetime you could use merge + something like collect 3 messages into a list and order the list + selectmany
AndreSteenbergen
@AndreSteenbergen
Sep 27 2018 12:23
Sorry if am not completely clear. I have a partitioned stream; all messages per partition are in order. But the messages in the stream itself might not be in correct order; so stream might have 2 partitions 0 and 1, with ints in this case. <0,3><1,5><0,7><1,6><1,9><0,8>, I would like to get messages 3, 5, 6, 7, 8, 9, my plan was to use partitionhub to split the 0 and 1 streams, and remerge the 2 streams into 1 stream based on the value.
So I can't collect 1 message from all, process them all and continue. That would be zip
I think I'll go with Custom Stage getting a value from all sources and pass the one with the lowest value, and requesting a new value from the winning source; until all sources complete.
Marc Piechura
@marcpiechura
Sep 27 2018 12:36
Alright got it
So maybe a simple merge + unfold would work, use a list as your state, when first element arrives put it in the list and return null, when second element arrives also put it in the list and return list.Min()
and keep it going for
All elements that are coming after the second one, I.e. add to list return List.Min
Marc Piechura
@marcpiechura
Sep 27 2018 12:41
Argh ok no unfold doesn’t work that way 😂

“completes when the unfold function returns an null value

But you get the idea, we also have selectstateful / selectmanystateful
Maybe one of those would work
AndreSteenbergen
@AndreSteenbergen
Sep 27 2018 13:00
I get the idea; will try something tonight; thanks for brainstorming with me
I'll look into selectstateful as well
AndreSteenbergen
@AndreSteenbergen
Sep 27 2018 13:05
PartitionWith can also be a contender; I would only need a dynamic number (with max) of TOut ... hmmm so many options. I will work on a POC tonight.
Together with a MergePrefered/ MergePrioritized ....
Thx @marcpiechura Marc
AndreSteenbergen
@AndreSteenbergen
Sep 27 2018 17:43
Marc Piechura
@marcpiechura
Sep 27 2018 17:46
👍