These are chat archives for akkadotnet/AkkaStreams

28th
Sep 2018
AndreSteenbergen
@AndreSteenbergen
Sep 28 2018 11:25

I have a question, I am reading the sources for MergedSort, shouldn't the compare be against < 0 instead of -1 ?

        private void Dispatch(T left, T right)
        {
            if (_stage._compare(left, right) == -1)
            {
                _other = right;
                Emit(_stage.Out, left, _readLeft);
            }
            else
            {
                _other = left;
                Emit(_stage.Out, right, _readRight);
            }

}

Bartosz Sypytkowski
@Horusiath
Sep 28 2018 11:36
@AndreSteenbergen you're right
there's no guarantee that comparison will return exactly -1 (often some methods will straight up return any negative value just because of performance)
AndreSteenbergen
@AndreSteenbergen
Sep 28 2018 11:44
Should I raise a github issue?
AndreSteenbergen
@AndreSteenbergen
Sep 28 2018 13:44
I am migrating my kafka topics from one server to another, I would like to use more partitions, so I can't just read and dump. I have created a Partitioner and Merge graph, to keep order of all messages. This seems to work for the messages, but the task never gets done at line 152. (see gist: https://gist.github.com/AndreSteenbergen/700a25c862e291486c1e351b588f79ff) Does anyone have an idea? I used a killswitch so downstream knows there are no new messages coming. I see messages, and I see the eofs and the shutdown message. I just never see the I am done message....
I am changing the serializer as well, json to msgpack
AndreSteenbergen
@AndreSteenbergen
Sep 28 2018 14:01
Ok, I did some searching I found the scala version; https://stackoverflow.com/questions/47599617/wait-for-completion-of-closedshape-stream I need to add source.WatchTermination. How do I add a Watch Termination if I used a Graph Builder?
Marc Piechura
@marcpiechura
Sep 28 2018 16:45
@AndreSteenbergen I think you could use it the same way you used the KillSwitch, for the creation Flow.Create<>().WatchTermination() should work
AndreSteenbergen
@AndreSteenbergen
Sep 28 2018 17:50
Thx, that works; sadly the task from Kafka Source never terminates, even if the killswitch has Shutdown; I think I'll need to add to handle cancellation when downstream cancels
AndreSteenbergen
@AndreSteenbergen
Sep 28 2018 18:25
Yep, the kafka sourcestage logic didn't handle downstream complete. I'll add that to the Pull Request I made for Kafka.
Marc Piechura
@marcpiechura
Sep 28 2018 18:30
That’s probably the one I still have to review, I hope I’ll find some time over the weekend
AndreSteenbergen
@AndreSteenbergen
Sep 28 2018 19:01
np, that's oss life ;)
I changes my many inputs to one MergeSorted as following; it seems to work quite well.
    private static Outlet<T> MergeSortedOutlets<S, T>(GraphDsl.Builder<S> builder, List<Outlet<T>> currentSetOutlets, Func<T, T, int> fn)
    {
        while (currentSetOutlets.Count > 1)
        {
            var newOutlets = new List<Outlet<T>>();

            for (int i = 0; i < currentSetOutlets.Count; i += 2)
            {
                if (i + 1 == currentSetOutlets.Count)
                {
                    newOutlets.Add(currentSetOutlets[i]);
                }
                else
                {
                    var merger = builder.Add(new MergeSorted<T>(fn));

                    builder.From(currentSetOutlets[i]).To(merger.In0);
                    builder.From(currentSetOutlets[i + 1]).To(merger.In1);

                    newOutlets.Add(merger.Out);
                }
            }
            currentSetOutlets = newOutlets;
        }

        return currentSetOutlets[0];
    }
@marcpiechura are you doing all alpakka pr's? I only saw comments on the AMQP v1.0, not the Kafka variant