These are chat archives for akkadotnet/AkkaStreams

18th
Sep 2018
AndreSteenbergen
@AndreSteenbergen
Sep 18 2018 10:15
Is there a way to figure out the PostStop is called from a sink? I have a graph with at the end a kafka sink. I end the graph with SourceActor.Tell(new Status.Success("done")). If I am correct this should stop the stream. The kafka producer goes to poststop and flushes it's messages. Is there any way I can wait for completion of the flush? Or should I just stop the actor with the graph and rely on the poststop working correctly?
Vasily Kirichenko
@vasily-kirichenko
Sep 18 2018 11:29
Why do you not use a KillSwitch?
AndreSteenbergen
@AndreSteenbergen
Sep 18 2018 11:30
Euhm, what is the benefit to that, I have a sourceActor, telling succes should do the trick right?
So I don't know actually why I am not using Killswitch

In my Prestart I have the following

var webpageEventSource = Source.ActorRef<KeyValuePair<Guid, IEvent<Guid>>>(1024, OverflowStrategy.DropTail);
var sink = KafkaProducer.PlainSink(producerSettings);
var graph = webpageEventSource
            .Via(EventToMetaAndMessage.CreateFlow<Guid, IEvent<Guid>>(eventsTopic))
            .To(sink);
SourceActor = Context.Materializer().Materialize(pagegraph);

In my post Stop I have the following code

SourceActor.Tell(new Status.Success("done"));
AndreSteenbergen
@AndreSteenbergen
Sep 18 2018 11:40
I am in doubt what happens when I close the actor containg this sourceActor. I would like to close all resources correctly, and have all messages posted into kafka ;) And just always waiting the flush timout is something I would not like to do.
Vasily Kirichenko
@vasily-kirichenko
Sep 18 2018 11:44
I think you should WatchTermination on the stream in your post stop.
I mean you should wait the Task WT returns as you materializes the stream.
ah, wait, it may not guarantee that all messages have reached the sink. Or it does?
AndreSteenbergen
@AndreSteenbergen
Sep 18 2018 11:46
If the stream terminates I geuss so
Vasily Kirichenko
@vasily-kirichenko
Sep 18 2018 11:46
If not, you need not a sink, but a Flow and use .Via(flow)
maybe.
AndreSteenbergen
@AndreSteenbergen
Sep 18 2018 11:47
I could use the Flow from kafka instead of PlainSink
and create a Sink to Context.Self
That would solve my problem I guess.
AndreSteenbergen
@AndreSteenbergen
Sep 18 2018 11:56
I just created a flow instead of a sink, the sink
var actorSink = Sink.ActorRef<DeliveryReport<..>>(Self, StreamCompleteTick.Instance);
I keep reference to the last task, which I can await
Vasily Kirichenko
@vasily-kirichenko
Sep 18 2018 11:59
var system = ActorSystem.Create("test");
var mat = ActorMaterializer.Create(system);

var terminated =
    Source
        .From(Enumerable.Range(1, 10))
        .Buffer(1_000, OverflowStrategy.Backpressure)
        .WatchTermination(Keep.Right)
        .To(Sink.ForEach<int>(x =>
        {
            Write($"got {x}, sleeping...");
            Thread.Sleep(200);
            WriteLine("done");
        }))
        .Run(mat);

terminated.ContinueWith(__ => WriteLine("Terminated"));
system.WhenTerminated.Wait();
got 1, sleeping...done
got 2, sleeping...done
got 3, sleeping...done
got 4, sleeping...done
got 5, sleeping...done
got 6, sleeping...done
got 7, sleeping...done
got 8, sleeping...done
got 9, sleeping...done
got 10, sleeping...done
Terminated
so it seems that watch termination is safe
AndreSteenbergen
@AndreSteenbergen
Sep 18 2018 12:01
Thanks for the fast test, I have a sourceActor, So just wainting when that actor dies I guess.
Context.Materializer().Materialize(graph);
Vasily Kirichenko
@vasily-kirichenko
Sep 18 2018 12:02
I'm not sure. However, I've never used source actors.
AndreSteenbergen
@AndreSteenbergen
Sep 18 2018 12:06
I am going to write some small test, I think I have enough to get something working thx
I will post my findings
AndreSteenbergen
@AndreSteenbergen
Sep 18 2018 12:21
Hummmm, little stuck ...
        var system = ActorSystem.Create("test");
        var mat = ActorMaterializer.Create(system);


        var source = Source.ActorRef<int>(10, OverflowStrategy.DropTail);
        var graph = source
                .WatchTermination(Keep.Right)
                .To(Sink.ForEach<int>(x =>
                {
                    Console.Write($"got {x}, sleeping...");
                    Thread.Sleep(200);
                    Console.WriteLine("done");
                }));
How can I get the actorref to send to ints to?
Ah wait, maybe I don't need that now, I can watch the source ...
Vasily Kirichenko
@vasily-kirichenko
Sep 18 2018 12:24
Keep.Both
AndreSteenbergen
@AndreSteenbergen
Sep 18 2018 12:24
thx
Works the same:
        var system = ActorSystem.Create("test");
        var mat = ActorMaterializer.Create(system);


        var source = Source.ActorRef<int>(10, OverflowStrategy.DropTail);
        var graph = source
                .WatchTermination(Keep.Both)
                .To(Sink.ForEach<int>(x =>
                {
                    Console.Write($"got {x}, sleeping...");
                    Thread.Sleep(200);
                    Console.WriteLine("done");
                }));
        var (actorRef, terminated) = mat.Materialize(graph);

        actorRef.Tell(0);
        actorRef.Tell(1);
        actorRef.Tell(2);
        actorRef.Tell(3);
        actorRef.Tell(4);
        actorRef.Tell(5);
        actorRef.Tell(6);
        actorRef.Tell(7);
        actorRef.Tell(8);
        actorRef.Tell(9);
        actorRef.Tell(new Status.Success("done"));

        terminated.ContinueWith(__ => Console.WriteLine("Terminated"));
        system.WhenTerminated.Wait();
got 0, sleeping...done
got 1, sleeping...done
got 2, sleeping...done
got 3, sleeping...done
got 4, sleeping...done
got 5, sleeping...done
got 6, sleeping...done
got 7, sleeping...done
got 8, sleeping...done
got 9, sleeping...done
Terminated
Vasily Kirichenko
@vasily-kirichenko
Sep 18 2018 12:30
yeah. But why not use Source.Queue? :)
AndreSteenbergen
@AndreSteenbergen
Sep 18 2018 12:31
I didn't know of that Source ....
I'll read documentation on Source.Queue
Vasily Kirichenko
@vasily-kirichenko
Sep 18 2018 12:32
var source = Source.Queue<int>(10, OverflowStrategy.DropTail);
var (queue, term) = source
    .WatchTermination(Keep.Both)
    .To(Sink.ForEach<int>(x =>
    {
        Write($"got {x}, sleeping...");
        Thread.Sleep(200);
        WriteLine("done");
    }))
    .Run(mat);

for (var i = 0; i < 10; i++)
    queue.OfferAsync(i).Wait();

queue.Complete();
term.ContinueWith(__ => WriteLine("Terminated"));
system.WhenTerminated.Wait();
works the same
it has a bounded buffer and it's backpressed if it's exceeded
AndreSteenbergen
@AndreSteenbergen
Sep 18 2018 12:33
A ok, thx
Vasily Kirichenko
@vasily-kirichenko
Sep 18 2018 12:34
but you should never call OfferAsync from multiple threads simultaneously
it will result with an exception
AndreSteenbergen
@AndreSteenbergen
Sep 18 2018 12:34
ok, it's all in 1 actor, so I don't need to worry about that
Vasily Kirichenko
@vasily-kirichenko
Sep 18 2018 12:34
and it returns offer result - success or not. I've no idea what to do if it returns failure tho
ok
AndreSteenbergen
@AndreSteenbergen
Sep 18 2018 12:35
thanks a lot, this is exactly what I need
because PostStop is called correctly for all actors and logicstages, so the flush will get called. I have faith in that
Vasily Kirichenko
@vasily-kirichenko
Sep 18 2018 12:36
yeah, it's a work horse of integration Streams with outside world
Bartosz Sypytkowski
@Horusiath
Sep 18 2018 16:04

but you should never call OfferAsync from multiple threads simultaneously

@vasily-kirichenko are you sure?

Vasily Kirichenko
@vasily-kirichenko
Sep 18 2018 17:24
@Horusiath however, I cannot get exceptions in a test, see https://gist.github.com/vasily-kirichenko/55481886d20eb17e5ed97ac9293cc30b
maybe it's fixed?