These are chat archives for akkadotnet/akka.net

28th
Apr 2017
Keith Nicholas
@keithn
Apr 28 2017 01:52
@fanoI I have actors that deal with serial ports, akka is perfect for devices connected via all kinds of things across different machines
Maxim Cherednik
@maxcherednik
Apr 28 2017 09:06
hey guys, trying to have a look into akka streams. I've got a situation slow producer/fast consumer. Producers semantics are pull based. It's a datastore with interface like this: IEnumerable<long> GetElements(long sequenceId); So the goal is take those elements and process them somehow into a sink. Let's imagine this sink it takes 1 second to consume events, while it takes 10 seconds to get the data for the Producer. Will akka streams somehow understand that my consumer is way faster and it will try to signal Producer to collect data in advance?
This is what I am trying to do:
class Program
    {
        static void Main(string[] args)
        {
            using (var sys = ActorSystem.Create("Reactive-Test"))
            {
                using (var mat = sys.Materializer())
                {
                    Source<IEnumerable<long>, NotUsed> intsSource = Source.FromEnumerator(() => GetData().GetEnumerator());

                    var streamTask = intsSource.RunForeach(ints =>
                     {
                         Console.WriteLine("===================== Consumer processed " + ints.Last());
                     }, mat);


                    streamTask.Wait();
                }
            }
        }

        private static IEnumerable<IEnumerable<long>> GetData()
        {
            var dataProvider = new DataProvider();
            var latestSequenceId = 1;
            while (true)
            {
                Console.WriteLine("+++++++++++++++++++++ Producer Start " + latestSequenceId);
                Thread.Sleep(10000); // I want to emulate slow producer, fast consumer
                Console.WriteLine("+++++++++++++++++++++ Producer Ready " + latestSequenceId);
                yield return dataProvider.GetInts(latestSequenceId);
                latestSequenceId += 1000;
            }
        }
    }
Marc Piechura
@marcpiechura
Apr 28 2017 09:15
@maxcherednik The sink sends a demand signal upstreams to the source that signals that it is able to process another item, so if your sink is faster, a demand signal is available in your producer and it will push the new item once available. If the producer has a item available but no demand signal has arrived it will "backpressure" and wait until he receives a demand signal. So yes, if your consumer is faster it's all push based.
But you need to change intsSource.RunForeach to intsSource.Async().RunForeach , reason is that during materialization the stages are fused together and all run in the same actor / task if no async boundary was detected, so your Thread.Sleep call will also block the sink. .Async introduces such a async boundary and therefore the source and sink are running on different actors/tasks
Maxim Cherednik
@maxcherednik
Apr 28 2017 09:20
@Silv3rcircl3 ah... I see. Let me think ...

So how will this work in my case:

a demand signal is available in your producer and it will push the new item once available

My producer is pull based, but there are lots of data ready to be pulled, lets say.... Do I need to change my producer somehow?

Maxim Cherednik
@maxcherednik
Apr 28 2017 09:25
Another strange part, which I don't really understand:
+++++++++++++++++++++ Producer Start 1
+++++++++++++++++++++ Producer Ready 1
+++++++++++++++++++++ Producer Start 1001
+++++++++++++++++++++ Producer Ready 1001
+++++++++++++++++++++ Producer Start 2001
===================== Consumer processed 1000
+++++++++++++++++++++ Producer Ready 2001
===================== Consumer processed 2000
+++++++++++++++++++++ Producer Start 3001
+++++++++++++++++++++ Producer Ready 3001
===================== Consumer processed 3000
+++++++++++++++++++++ Producer Start 4001
My consumer starts processing only after the second Producer's read...
Marc Piechura
@marcpiechura
Apr 28 2017 09:31
.Async() adds a Buffer stage, so you producer can push up to 16 elements by default before it get's backpressured and the sink will pull the elements from the buffer
I think the problem is that the Thread.Sleep also blocks the buffer and therefore it has only a short range of time to push the elements to the sink
I would rewrite it so that you use an actor as source or Source.Queue
Marc Piechura
@marcpiechura
Apr 28 2017 09:36
so no Thread.Sleep anymore ;)
Maxim Cherednik
@maxcherednik
Apr 28 2017 10:32
@Silv3rcircl3 I realized something :) It doesn't matter how slow my producer. In the storage of mine there are way more data, that means it's already somehow "faster" than my consumer. I just don't understand how to wire it so that it pull the data all the time...
Maxim Cherednik
@maxcherednik
Apr 28 2017 10:42
Managed to do something
private static IEnumerable<IEnumerable<long>> GetData()
        {
            var dataProvider = new DataProvider();
            var latestSequenceId = 1;
            while (true)
            {
                Console.WriteLine("+++++++++++++++++++++ Producer get " + latestSequenceId);
                // Thread.Sleep(10000); // this thread sleep should be inside the dataProvider so that to emulate the slowness of the db,                
                yield return dataProvider.GetInts(latestSequenceId);
                latestSequenceId += 1000;
            }
        }
Maxim Cherednik
@maxcherednik
Apr 28 2017 11:09
eh.. still not cant achieve it
So I guess what I want to achieve is this. Producer should go full speed and provide as much elements as it can....Meanwhile consumer also goes full speed.... and soon maybe if produce will be too fast it will notify to slow down
for some reason I don't see it on the logs :)
Rik
@rikbosch
Apr 28 2017 11:29

Hi, is it preferable to use Immutable<T> for response types in a grain interface?

e.g.

  Task<Immutable<IList<LocationResultViewModel>>> PlacesSearch(string searchKey);

furthermore, as the IList<> is immutable.. Is it better to return an IReadOnlyList?

errm.. wrong gitter group.. my bad
Maxim Cherednik
@maxcherednik
Apr 28 2017 11:44
Reading the documentation here:
http://getakka.net/docs/streams/builtinstages

it's stated that:

FromEnumerator
Stream the values from an Enumerator, requesting the next value when there is demand. The enumerator will be created anew for each materialization, which is the reason >the method takes a function rather than an enumerator directly.

If the enumerator perform blocking operations, make sure to run it on a separate dispatcher.

emits the next value returned from the enumerator

so I guess this is exactly what I observe... my Thread.Sleep is blocking
Marc Piechura
@marcpiechura
Apr 28 2017 12:22
@maxcherednik you have it out of the box, everything will run as fast as it can but NOT faster ;-)
If you want some sort of buffering you can use a buffer stage, for example if your consumer is really fast but may have some "lags" now and then. So your producer can produce new elements even if the consumer is slow, at least until the buffer is full
Yup, Thread.Sleep is tricky for this purpose, as I sad I would try it with Source.Queue or Source.FromActor or how it is exactly called
Maxim Cherednik
@maxcherednik
Apr 28 2017 12:26
ok, but somehow still struggling to connect things together...
reading and reading documentation
re: Thread.Sleep. I still think it's quite ok to use it to simulate the slowness.... but there is a problem - my fast consumer demands it only once
and that's why Producer is not generating next element until the previous one.
and the problem here is that I don't understand what to expect from it :)
Maxim Cherednik
@maxcherednik
Apr 28 2017 13:06
is it me who wired it wrong or it's working like it's designed
Maxim Cherednik
@maxcherednik
Apr 28 2017 13:18
@Silv3rcircl3 nothing helps :(
using (var sys = ActorSystem.Create("Reactive-Test"))
            {
                using (var mat = sys.Materializer())
                {
                    var streamTask = Source.FromEnumerator(() => GetData().GetEnumerator())
                                    .Buffer(100, OverflowStrategy.Backpressure)
                                    .Async()
                                    .RunForeach(ints =>
                                    {
                                        Console.WriteLine(Thread.CurrentThread.ManagedThreadId + " === Consumer processed " + ints);
                                    }, mat);

                    streamTask.Wait();
                }
            }
It pulls only one element from the source...
Maxim Cherednik
@maxcherednik
Apr 28 2017 13:29
I noticed also another strange part. Event with Async boundary involved I see that slow producer blocking the element which was already fetched but it got stuck somewhere...
Maxim Cherednik
@maxcherednik
Apr 28 2017 14:33
@Silv3rcircl3 ok, I guess I figured it out more or less for now:)
got another q. I didn't manage to find it from the documentation. Akka.Streams and async/await are not friends, right?
for instance, when I have a sink where I need to write to the db and the interface is like this: Task<int> SaveAsync();
how to await?
Bartosz Sypytkowski
@Horusiath
Apr 28 2017 14:58
@maxcherednik source.SelectAsync(async entity => await db.SaveAsync(entity)).To(Sink.Ignore<int>())
Maxim Cherednik
@maxcherednik
Apr 28 2017 14:59
ah
thanks... will try
Bartosz Sypytkowski
@Horusiath
Apr 28 2017 14:59
Sink.ignore is if you're not interested with result of saving