These are chat archives for akkadotnet/AkkaStreams

29th
May 2018
geiziry
@geiziry
May 29 2018 09:58
I've linear stream of source=>f1=>f2=>f3=> sink
How can I delay f3 until f1 is complete
Marc Piechura
@marcpiechura
May 29 2018 10:20
@geiziry could you explain in a bit more detail what you’re trying to do?
geiziry
@geiziry
May 29 2018 11:31
@marcpiechura source= of log files
f1= I parse those files to objects
f2= loop through each file's events as first step to generate a report.
f3= look for related events in other files (so all files have to be already parsed in f1) as second step to generate a report
sink= get the processed event and write to the output.
Marc Piechura
@marcpiechura
May 29 2018 11:37
And depends f3 on f2 or could it be connected to f1 directly?
geiziry
@geiziry
May 29 2018 11:44
Yes, depends f3 on f2, because f2 will specify which event will need to be looked for in other files.
Marc Piechura
@marcpiechura
May 29 2018 12:14
Alright so a simple solution could maybe work like this f1 read file and create object -> f2 generate report -> f3 Sink.Aggregate aggregate all incoming reports into one or multiple / f4 which is connected to f2 via .AlsoTo(Sink.Foreach), iterates of all reports and prints all elements
So for f3 you need a stateful function which checks all reports and extracts the relevant events, Aggregate could do
I‘m on the phone so it’s a little bit hard to explain
geiziry
@geiziry
May 29 2018 14:10
@marcpiechura thank you very much for your kind help, I'll try it and let you know if it worked
geiziry
@geiziry
May 29 2018 14:55

@marcpiechura but then f3 and f4 are not connected
this is the graph code to give clearer picture of what I'm trying to achieve

var g = RunnableGraph.FromGraph(GraphDsl.Create(b =>
            {
                var source = Source.From(viewModel.LogFiles);

                var sink = Sink.ActorRef<Tuple<bool, LogRptDto>>(logFilesExcelProviderActor, logFileRptGeneratorService.GetReportRows());//send all list of LogRptDto with checkouts and checkins to fill excel file

                var parsing = Flow.Create<LogFile>()
                                .Select(x =>
                                {
                                    viewModel.OverallProgress++;
                                    return logFilesParsingService.ParseLogFileEventsAsync(x).Result;
                                });//parse log files (list of events checkouts, checkins)

                var reportGen = Flow.Create<LogFile>()
                                .SelectAsyncUnordered(int.MaxValue, logFileRptGeneratorService.GenerateReport)
                                .SelectMany(x => x);//check of checkins related to checkouts within same files and fill list of LogRptDto, and return list of LogRptDto with checkouts without checkins from same file to be checked in other files

                var getCheckIns = Flow.Create<LogRptDto>()
                                .SelectAsyncUnordered(int.MaxValue, l => logFileRptGeneratorService.GetCheckInforInUseOuts(l, viewModel.LogFiles));//check list of LogRptDto to get checkins from other logfiles


                b.From(source).Via(parsing).Via(reportGen).Via(getCheckIns).To(sink);

                return ClosedShape.Instance;
            }));

            g.Run(Context.Materializer());

so, I'm filling events in list of LogFile, and from this I'm filling List of LogRptDto which then used to print to output
and problem here that sometimes the flow reaches to getCheckIns before parsing is complete for all logFiles