These are chat archives for akkadotnet/AkkaStreams

10th
Oct 2016
Sean Farrow
@SeanFarrow
Oct 10 2016 19:45
Hi, I'm converting a process from actors to streams. I've got an actor that downloads rss feeds--which I've converted to a source, but the main reason for having a parent actor was to allow processing--there are several different implementations of the processing logic depending on the specific RSS feed downloaded. In my current actors code I run a foreach that breaks if the specific item I'm looking for is found. I was wanting to try and use a kill switch, to stop anymore items flowing through the stream, but I don't think I can do this from within a graph stage. Could someone verify whether this is correct, and if it is what the best way of handling this is, given that the parsing code currently emitsa result back to an actor.result
Marc Piechura
@marcpiechura
Oct 10 2016 19:55
@SeanFarrow so you want to stop the stream if your processing logic found a matched item in the RSS feed?
Sean Farrow
@SeanFarrow
Oct 10 2016 20:54
Yes, exactly. Ideally, processing the feed and downloading should be completely separate.
Marc Piechura
@marcpiechura
Oct 10 2016 20:58
            var matchFound = Source.From(new[] {"no match", "match"})
                .Where(s => s.Equals("match"))
                .RunWith(Sink.First<string>(), materializer);

            matchFound.Wait();
            var found = matchFound.Result;
wouldn't this work?
Sean Farrow
@SeanFarrow
Oct 10 2016 21:26
Possibly, I'll do some testing. Is there a reason you have a matchfound.wait? does this return a task?
Marc Piechura
@marcpiechura
Oct 10 2016 21:30
Yup exactly, the stream will complete the task with the first element that is found and throw an exception if none was found
You can also use FirstOrDefault if you want to avoid the exception
Basically the same behavior as the First / FirstOrDefault from linq
To be clear, the task is completed with failure if none was found, so task.Wait will throw
Sean Farrow
@SeanFarrow
Oct 10 2016 21:41
Lovely, thanks Mark! Can I pass this to another stage in my processing? I'm assuming it raises task of tea I mean returns task of tea even!
Sean Farrow
@SeanFarrow
Oct 10 2016 21:46
Is there a stage built-in to streams that let's me do an if else type of thing in terms of the next stage in the pipeline, so for example let say I have a string passed to a stage and it's null, I want ot perform one action and if it has a value I want to do something else.
Marc Piechura
@marcpiechura
Oct 10 2016 21:52
Don't use null as element, null's are not allowed in the reactive streams protocol! For if else I would probably use the GraphDsl with broadcast and a filter stage behind every broadcast output
Sean Farrow
@SeanFarrow
Oct 10 2016 21:55
Ok, that makes sense, I only used the null example as if I use FirstOrDefault, I could get null as a default value.
Marc Piechura
@marcpiechura
Oct 10 2016 21:57
Yeah right, you would need to change null to string.empty in the task continuation to avoid that
You could use Source.FromTask for further processing
Or continue after the where with a Take(1) instead of RunWith(First)
Sean Farrow
@SeanFarrow
Oct 10 2016 22:01
ARe there any examples of filter stages?
Marc Piechura
@marcpiechura
Oct 10 2016 22:01
Maybe somewhere in the docs :P
Or look at the tests
A sorry
It's called Filter on the jvm, I meant where
So broadcast.Out1 -> Where(s=> s== string.empty) ...
And broadcast.Out2 -> Where(s=> s != string.empty)
Sean Farrow
@SeanFarrow
Oct 10 2016 22:08
Brilliant, I found filter on the jvm side, I'll have a look at the docs/tests.