These are chat archives for akkadotnet/AkkaStreams

2nd
Dec 2017
metachro
@metachro
Dec 02 2017 20:26
Hi folks, I'm trying to understand a weird(?) behavior I'm seeing with what i thought would be a trivial test app--i tried to something like https://github.com/Horusiath/AkkaDemos/tree/master/reactive-streams/Demo.SqlReader except instead of using an actor publisher, I used a method that returns an IEnumerable (and does so via yield return of things read in via datareader), and consuming it with Source.From(). What I'm seeing is that it gets "stuck" after a while--but increasing the syncprocessinglimit makes it go longer without this behavior. Does this have something to do with the fact that Source.From() lets each of the downstream subscriber always see the enumeration from the beginning (i have only one thing behind it for the test, which is just a Sink.Foreach() that prints)?
Bartosz Sypytkowski
@Horusiath
Dec 02 2017 22:55
@metachro I guess, your processing flow has dead locked, but it's hard to advise something. Maybe can you show the code?
metachro
@metachro
Dec 02 2017 23:01
absolutely. apologies in advance for inelegant code. only trying to build POC. Let me see if i can get the format right...protected IEnumerable<Unpivoted<ValueTuple<int, Guid>, int, string>> GetDataFromSource() { using (var conn = new SqlConnection(_ConnString)) using (var cmd = conn.CreateCommand()) { cmd.CommandText = _SqlCommandText; conn.Open(); using (var r = cmd.ExecuteReader()) { while (r.Read()) { yield return new Unpivoted<ValueTuple<int, Guid>, int, string> { ObjectKey = ValueTuple.Create(r.GetInt32(0), r.GetGuid(1)), DomainKey = r.GetInt32(2), DomainValue = r.GetString(3) }; } } conn.Close(); } }
ugh... obviously not.
facepalm ok maybe my first question should've been "how do i post code here" sorry bout the spam
metachro
@metachro
Dec 02 2017 23:07
public class Unpivoted<TObjectKey, TDomainKey, TDomainValue>
    {
        public TObjectKey ObjectKey;
        public TDomainKey DomainKey;
        public TDomainValue DomainValue;
    }

IEnumerable<Unpivoted<ValueTuple<int, Guid>, int, string>> GetDataFromSource()
        {
            using (var conn = new SqlConnection(_ConnString))
            using (var cmd = conn.CreateCommand())
            {
                cmd.CommandText = _SqlCommandText;
                conn.Open();
                using (var r = cmd.ExecuteReader())
                {
                    while (r.Read())
                    {
                        yield return new Unpivoted<ValueTuple<int, Guid>, int, string>
                        {
                            ObjectKey = ValueTuple.Create(r.GetInt32(0), r.GetGuid(1)),
                            DomainKey = r.GetInt32(2),
                            DomainValue = r.GetString(3)
                        };
                    }
                }
                conn.Close();
            }
        }

using (var s = ActorSystem.Create("system"))
            using (var m = s.Materializer(ActorMaterializerSettings.Create(s).WithDebugLogging(true).WithSyncProcessingLimit(10000000)))
            {
                var g = RunnableGraph.FromGraph(
                    GraphDsl.Create(b =>
                    {
                        int i = 0;
                        var src = b.Add(Source.From(GetDataFromSource()).Buffer(4096, OverflowStrategy.Backpressure));
                        var snk = b.Add(Sink.ForEach<Unpivoted<ValueTuple<int, Guid>, int, string>>((f) => { Console.WriteLine(i + " " + f.ObjectKey); ++i; }));

                        b.From(src).To(snk);
                        return ClosedShape.Instance;
                    }));

                var z = g.Run(m);
            }
metachro
@metachro
Dec 02 2017 23:18
@Horusiath i tried to pull out the relevant bits, but i guess the formatter still doesn't like it much-- so this code runs, and prints stuff out--but depending on the value of the syncprocessinglimit, it changes how far it can get before getting stuck. the higher, the further along it can get.