These are chat archives for akkadotnet/AkkaStreams

Sep 2018
Sep 21 2018 05:51

Does anyone have an idea how a sourceQueue can get detached?

I had this error in the logs today, I had a long running Actor, containing a source queue with a sink to Kafka producer. I don't have any errors besides this:

---> (Inner Exception #0) Akka.Pattern.IllegalStateException: Stream is terminated. SourceQueue is detached.<---

The only thing I did see was the idle connection reaper messages for the producer, but I got those in the past as well. Without any issues. The actor restarted ok and all but a few the messages got persisted, as far as I can see.

Bartosz Sypytkowski
Sep 21 2018 08:11

@Horusiath I think it should not be async at all. What are you gonna do if it returns false? If it's back pressured, Offer should just block.

@vasily-kirichenko The issue here is that in theory you could just call OfferAsync billion times without awaiting: even thou you may just drop task that is returned, the queue itself still will need to keep it internally without any upper bound defined (otherwise it won't be able to complete it) and piling up in memory. What source queue actually does, is to queue the offers (think: offer tasks, not elements), up to a given limit, after which offers will be just dropped.

Vasily Kirichenko
Sep 21 2018 10:40
I have a bunch of Sources. Is it possible to merge them all into a single stream?
Sep 21 2018 11:04
Shouldn't that be possible with a Zip?
Is this a good method to get async messages into my actor?
    protected override void PreStart()

        //create a FileSystemWatcher; every <<Guid>>.json in the directory is important
        var config = Context.System.Settings.Config.GetString("");
        var source = Source.Queue<FileSystemEventArgs>(100, OverflowStrategy.DropTail);

        var sink = Sink.ActorRef<FileSystemEventArgs>(Self, "Done");

        var mat = ActorMaterializer.Create(Context);
        (queue, streamTermination) = source.WatchTermination(Keep.Both).To(sink).Run(mat);

        watcher = new FileSystemWatcher
            Path = config,
            NotifyFilter = NotifyFilters.LastWrite | NotifyFilters.FileName,
            Filter = "*.json"
        watcher.Changed += OnChanged;
        watcher.EnableRaisingEvents = true;
Marc Piechura
Sep 21 2018 11:07
Why not simply call self.Tell inside the change handler ?
Sep 21 2018 11:08
ok, overcomplicating things
Thomas Lazar
Sep 21 2018 19:51
Hi there. I just started derping around with streams and I wrote a little test program that isn't working as I thought it would be.
Is basically just a IActorRef source that should transform the data it gets to a string and send it to a file as csv format.
When everything is wired up properly (but probably not) then I have a for loop that sends 1000 messages to that actor source that i expect to land in said csv file. But it always ends after 868 lines. Sometimes I get a 869 line that has some truncated data in it. So... what am I doing wrong?
Sep 21 2018 21:08
Can't seem to see anything really wrong .. Have you tried writing something simpler? just the int?
Is anyone using Kafka streams package as a source/ sink or flow? I have run into some issue with the producer. After 10 minutes of idle time an error is raised, after which the Stage end with FailStage. I think that's strange behviour for Kafka as a producer this should be too big of a deal. I know, I could make a producer that wouldn't be idling for 10 minutes, but that is another issue. I need to refactor some code.
Sep 21 2018 21:20
or am I missing somthing?