These are chat archives for akkadotnet/AkkaStreams

2nd
Dec 2016
Marc Piechura
@marcpiechura
Dec 02 2016 05:41
@SeanFarrow a Sink is the end of the stream in a logical way, but you can't know when the stream ends at runtime. So if you look at Sink.ForEach it materialize into a task which signals the outside world once the stream is done. You need to return the Task<WriteResult> , because it is create before the stream actually runs, so you need to create a TaskCompletionSource that you use inside your GraphLogic and return it's task as Materialized value. Once the API operation is done, you can set the result via completionSource
It sounds quite similar to Sink.First
Sean Farrow
@SeanFarrow
Dec 02 2016 11:12
@silv3rcircl3 Ok, so let's assume I have written a source that returns a class to the stage and a materialize value, I understand the fact that the class I return flows through the stream, but what happens to the materialized value? do they flow as a package, (a tuple or something?)
Marc Piechura
@marcpiechura
Dec 02 2016 11:15
No, they are not used inside the stream, they are for the outside world. For example Source.Queue materialize a Queue where you can add items which then flow through the stream
Sean Farrow
@SeanFarrow
Dec 02 2016 11:20
@silv3rcircl3 Ah, ok, so how do I access the materialized value, assuming this doesn't flow through the stream? Any eamples of this?
Marc Piechura
@marcpiechura
Dec 02 2016 12:41
@SeanFarrow You can access it inside the graphdsl if you want to use it inside a stream, see http://getakka.net/docs/streams/workingwithgraphs#accessing-the-materialized-value-inside-the-graph
Sean Farrow
@SeanFarrow
Dec 02 2016 20:34
Thanks. Given that each call to the api I'm wrapping returns a result and I'm returning this as a materialized value, how should I set this value in the case where the supervision strategy is not applicable. I'm using your azure queue storage code as an example, and am using an IEnumerable as a source. So my graph looks like:
Task<WriteResult> t =
Source.From(createdEvents)
.Select(x => ToEventData(x))
.RunWith(new EventStoreSink(this.Connection, "PasswordChanges"), this.Materializer);
I'm suspecting the stage is only returning the last WriteResult, but I want ot return all of them. Any advise apreciate.
Marc Piechura
@marcpiechura
Dec 02 2016 21:26
I see, so the task which my Azure Queue sink returns is completed with success when the stream finished or with an exception of the stream fails, so that's not helping you. Wha you could do is return a Task<Ienumerable<WriteResult>> which contains either all WriteResults for the events from you Source or an exception if the stream failed
And you complet this task once the stream finished, like I do in my Azure queue sink
Bartosz Sypytkowski
@Horusiath
Dec 02 2016 21:28
list of all WriteResults as materialized value is a bad idea. This flow may take hours to complete and operate on millions of events
Marc Piechura
@marcpiechura
Dec 02 2016 21:29
Million of events given as a list ? ;-)
But yes if the source is that big it's a bad idea ^^
Bartosz Sypytkowski
@Horusiath
Dec 02 2016 21:29
ok, I thought that createdEvents is a publisher
Marc Piechura
@marcpiechura
Dec 02 2016 21:31
Maybe a blocking collection that is filled with the results which you can pull out is a better approach then
Sean Farrow
@SeanFarrow
Dec 02 2016 21:57
@silv3rcircl3 How would you keep this as a materialized value using a blocking collection. Ideally, I would want a materialized value per invocation of the stage. This will run constantly, as it's used to write data from a signalR hub and to write data from an actor to the EventStore. I'm wondering although this probably should be modelled as a sink, whether I should model it as a pull stage and then have a sink afterwards to capture the results, thoughts? is there a blocking collection that provides notification of a new item?
Marc Piechura
@marcpiechura
Dec 02 2016 22:26
A materialized value can not be created per invocation of a stage, only once per Run call. But sure you could also write it as a Flow stage instead of a Sink with the following definition Flow<EventData, WriteResult> and then you could use any stage you want to process the WriteResult
Blocking collection wouldn't notify if a new value is added but you could call Pull which waits for a new item
Maybe it would help to understand what you want to do with the WriteResults
Sean Farrow
@SeanFarrow
Dec 02 2016 22:33
@silv3rcircl3 That's annoying as I need to treat it as a true sink 99 % of the time, but only need the WriteResult very ocasionally! Can a flow be a sink?
Marc Piechura
@marcpiechura
Dec 02 2016 22:37
No, because a sink has only an input while a flow has an input and an output. But how should the stream know when you want the WriteResult and when not?
If you decide that in your Flow stage you could simply ignore the once you don't need and only push one downstream if you want it
Sean Farrow
@SeanFarrow
Dec 02 2016 22:46
@silv3rcircl3 hum, that could be tricky. Should I just stick to actors? Would that be easier?
Marc Piechura
@marcpiechura
Dec 02 2016 22:51
Don't know, since I'm not sure what you're trying to archive ;) maybe it's to late here but as I see it you have a source of events, you store it in the eventstore and then you want to do something with the WriteResult, but not every time only on certain criteria
So you could simply write it that way Source -> StoreEvent -> Where( result => Match(result)) -> DoSomethingImportant
Sean Farrow
@SeanFarrow
Dec 02 2016 22:57
That works, what if I don't need a Sink, how do I create a graph?
Marc Piechura
@marcpiechura
Dec 02 2016 22:59
You always need a sink, but if you don't need to do anything in your sink you can use Sink.Ignore. But in my example the DoSomethingImportant could be called via Sink.Foreach
A graph can be created via GraphDSL class, there are many examples in the docs how to write graphs via GraphDSL
Sean Farrow
@SeanFarrow
Dec 02 2016 23:02
@silv3rcircl3 Ok, cool, that works. When would you use a flow or a graph stage? what do you use to determine which is appropriate?
Marc Piechura
@marcpiechura
Dec 02 2016 23:22
@SeanFarrow I would say if you have a single "path", e.g. Where -> Select -> GroupBy ... , you should use the normal Flow stages. If you need some more advanced features like broadcasting or BidiFlows you should use the GraphDSL. But it always depends ;-) And it's not a either this or that, you could do something like this too Where -> Via(someVeryComplexGraph) -> GroupdBy ...