These are chat archives for akkadotnet/AkkaStreams

25th
May 2018
Arjen Smits
@Danthar
May 25 2018 13:24 UTC
I got a little trouble with defining a flow using a Broadcast shape
This is what im defining:
 return Flow.FromGraph<Envelope,Envelope,NotUsed>(GraphDsl.Create(b =>
            {
                var broadcast = b.Add(new Broadcast<Envelope>(2));
                b.From(broadcast.Out(0))
                    .Via(Flow.Create<Envelope>().Where(x => x.Error != null))
                    .To(DeleteFrom(queue));
                var successflow = b.From(broadcast.Out(1))
                    .Via(Flow.Create<Envelope>().Where(x => x.Error == null));
                return successflow;
            }));
I want a flow which takes an msg, and either eats it up (the DeleteFrom(queu)) or returns it
so i can reuse it somewhere using the Via construct
However this does not seem to be the correct syntax for that. And I can't find the right one
Arjen Smits
@Danthar
May 25 2018 13:32 UTC
Seems i can do this syntax wise
 return Flow.FromGraph<Envelope,Envelope,NotUsed>(GraphDsl.Create(b =>
            {
                var broadcast = b.Add(new Broadcast<Envelope>(2));
                b.From(broadcast.Out(0))
                    .Via(Flow.Create<Envelope>().Where(x => x.Error != null))
                    .To(DeleteFrom(queue));
                return new FlowShape<Envelope, Envelope>(broadcast.In, broadcast.Out(1));
            }));
but then i'd have to add the where filter downstream
source.Via(DiscardErrors(queue))
                            .Where(x => x.Error == null);
although workable, not really what i want
Marc Piechura
@marcpiechura
May 25 2018 13:44 UTC
@Danthar
first version with
 return new FlowShape<Envelope, Envelope>(broadcast.In, successflow);
should work
I hope at least, haven't worked with the graph api that often
Arjen Smits
@Danthar
May 25 2018 13:45 UTC
Then it becomes:
 private static Flow<Envelope, Envelope, NotUsed> DiscardErrors(CloudQueue queue)
        {
            return Flow.FromGraph<Envelope,Envelope,NotUsed>(GraphDsl.Create(b =>
            {
                var broadcast = b.Add(new Broadcast<Envelope>(2));
                b.From(broadcast.Out(0))
                    .Via(Flow.Create<Envelope>().Where(x => x.Error != null))
                    .To(DeleteFrom(queue));
                var successflow = b.From(broadcast.Out(1))
                    .Via(Flow.Create<Envelope>().Where(x => x.Error == null));
                return new FlowShape<Envelope, Envelope>(broadcast.In, successflow);
            }));
        }
however then it gives an error
image.png
It forces me to define an Outlet there
i cannot directly put in the successFlow graph
Marc Piechura
@marcpiechura
May 25 2018 13:48 UTC
could you try to replace .Via(Flow.Create<Envelope>().Where(x => x.Error == null)); with .To(...)
that's probably not working too but it's a try, I'll check the docs to find an example
Arjen Smits
@Danthar
May 25 2018 13:50 UTC
No, not with the Flow.Create anyway. it would have to be a sink
Marc Piechura
@marcpiechura
May 25 2018 13:52 UTC
and successFlow doesn't have .Out or .Outlet property?
Arjen Smits
@Danthar
May 25 2018 13:53 UTC
facepalm
yes it does :P
lol
Marc Piechura
@marcpiechura
May 25 2018 13:53 UTC
:)
Arjen Smits
@Danthar
May 25 2018 13:54 UTC
getting an error from the QueueSource btw. missing method exception
building a trivial example to verify its not my own code
Nope, not my code.
Methode not found: Void Akka.Streams.Stage.GraphStageLogic.SetHandler(Akka.Streams.Outlet, System.Action, System.Action).'
Marc Piechura
@marcpiechura
May 25 2018 13:57 UTC
Yeah that’s the old Not typed api, we’ve removed them recently
So you have to change that, let me grab the issue
Arjen Smits
@Danthar
May 25 2018 13:58 UTC
ah, ok
using WindowsAzure.Storage::8.4.0 where current dev unittests are still running against WindowsAzure.Storage::7.1.2 in case it matters
Marc Piechura
@marcpiechura
May 25 2018 14:00 UTC
akkadotnet/akka.net#3231
A simple recompile could work
Arjen Smits
@Danthar
May 25 2018 14:01 UTC
of the alpakka source ?
Marc Piechura
@marcpiechura
May 25 2018 14:01 UTC
Yup
Arjen Smits
@Danthar
May 25 2018 14:01 UTC
ok
Marc Piechura
@marcpiechura
May 25 2018 14:03 UTC
'SetHandler(Outlet)' was replaced with 'SetHandler<T>(Outlet<T>)'
If you get a compile error it’s a bug ;-)
Arjen Smits
@Danthar
May 25 2018 14:04 UTC
compile works
now to test my sample
Marc Piechura
@marcpiechura
May 25 2018 14:04 UTC
fingers crossed
Arjen Smits
@Danthar
May 25 2018 14:06 UTC
nope
same error
are those changes released yet ?
Marc Piechura
@marcpiechura
May 25 2018 14:06 UTC
Have you updated Akka reference in the Alpakka project?
Arjen Smits
@Danthar
May 25 2018 14:06 UTC
could be im building against an older version of akka.streams
yeah that would be it
doh
it says its using latest stable akka.streams 1.3.1
but that cant be right
god damn nuget
ah there we go. Local nuget packagesources are a bitch
Arjen Smits
@Danthar
May 25 2018 14:11 UTC
yes
works now
hmm. Does the developmentqueue source stop when there are no more messages in the queue ?
It should keep going
Arjen Smits
@Danthar
May 25 2018 14:16 UTC
yeah i noticed. Maybe its how i define my flow
instead of creating a runnablegraph i use RunWith
So: var work = flow.RunWith(Sink.ForEach<Program.Envelope>(x => { Console.WriteLine($"processed: {x.ParsedMessage.Id}"); }),mat);
Marc Piechura
@marcpiechura
May 25 2018 14:18 UTC
That’s fine, RunWith is only shorthand for flow.To().Run()
Why do you think it stops?
Are you awaiting the task in work?
Arjen Smits
@Danthar
May 25 2018 14:18 UTC
no
im not doing anything with work. however for some reason its still completed
Marc Piechura
@marcpiechura
May 25 2018 14:19 UTC
But how do you know it’s completed if you doesn’t do anything with it ? :)
Arjen Smits
@Danthar
May 25 2018 14:20 UTC
well i add more messages to it
but they dont show up
ow lol
i think i know whats going on
Marc Piechura
@marcpiechura
May 25 2018 14:20 UTC
Note that RunWith isn’t blocking, so if you’re in a console app and don’t wait for the task it will close the console
Arjen Smits
@Danthar
May 25 2018 14:20 UTC
combination of the dequeue functionality of storagequeue, and my polling interval of 10 seconds
ok, its working like a charm ^^
thx for the help
for now i can downgrade to akka 1.3.1 untill we release an updated version of alpakka
Aaron Stannard
@Aaronontheweb
May 25 2018 14:23 UTC
reminds me
are the Alpakka packages all published through AkkaNetContrib?
Arjen Smits
@Danthar
May 25 2018 14:23 UTC
any specific reason why thats not done ? As in, blocking stuff ?
Aaron Stannard
@Aaronontheweb
May 25 2018 14:23 UTC
or Akka.NET?
Arjen Smits
@Danthar
May 25 2018 14:23 UTC
let me check
Aaron Stannard
@Aaronontheweb
May 25 2018 14:23 UTC
because I'll need to track down @alexvaluyskiy if it's the former
Arjen Smits
@Danthar
May 25 2018 14:24 UTC
akkanetcontrib
heh, awesome. This poc for a real world issue at work, is just what i needed to get some of my coworkers exited about akka.streams & akka. Awesome