These are chat archives for akkadotnet/AkkaStreams

9th
Nov 2017
Abdelmawla Mohamed
@abdomohamed
Nov 09 2017 00:08

Hi,

Let's say I've an actor that do maintin it's state using snapshots, so in case of any failure actor will be restarted and snapshot will be played again. Adding to the previous this actor will be creating stream for doing some data migration processing. The source of this stream will be populated from data saved in the snapshot offer, if there is any. if not will be fetching it from some where else (APi call). Based on that scenario, i'd like to restart the actor if any failure happened within the stream, so I'm wondering how this can be achieved ? Also is this a good approach for solving the problem ?

Boban
@bobanco
Nov 09 2017 01:11
@abdomohamed u can use RestartSource with backoff for that purpose
Abdelmawla Mohamed
@abdomohamed
Nov 09 2017 01:44
Thanks @bobanco but how I can let the actor read latest state from the db ?
I there a way to read snapshot from db and feed in this to the source ?
Boban
@bobanco
Nov 09 2017 01:53
are u using persistent actor for snapshots? if yes u could build projection which will store also the offset to where so when u will restart the projection u will always know at where u have to continue, for building the projection u have to use persistent query api
Abdelmawla Mohamed
@abdomohamed
Nov 09 2017 01:55
Yes I do, do you have an example on building projection from akka snapshots ?
Boban
@bobanco
Nov 09 2017 01:56
yes that one
Abdelmawla Mohamed
@abdomohamed
Nov 09 2017 01:59
it seems this avaialable only for SQL Sever is this right ? If yes, how I can do the same for mongodb
do I've to implement my own thing :)
Boban
@bobanco
Nov 09 2017 02:00
i dont know if there is any read joirnal for mongodb
Abdelmawla Mohamed
@abdomohamed
Nov 09 2017 02:06
Hey @Horusiath @Aaronontheweb do we you know if there is a Persistence Query implementation for mongodb ?
@bobanco thanks a lot for answering my question.
Aaron Stannard
@Aaronontheweb
Nov 09 2017 16:51
@abdomohamed not thgat I know of
we'd need to just add some meta-data to the Alpakka CSPROJ to publish that
Boban
@bobanco
Nov 09 2017 17:02
@Aaronontheweb ok, will make some updates today for AMQP implementation to support Ack/Nack
Aaron Stannard
@Aaronontheweb
Nov 09 2017 22:10
sounds good
I'll check the repo in the AM tomorrow and see about doing a version bump if those changes pass muster
Boban
@bobanco
Nov 09 2017 23:59
@Horusiath , @Silv3rcircl3 Source.Queue.OfferAsync() doesn't return QueueOfferResult.Failure when the stream fails, is that normal behaviour?
example:
var tup = Source.Queue<int>(0, OverflowStrategy.Backpressure).ToMaterialized(Sink.ForEach<int>(x =>
                {
                    if (x == 1)
                        throw new Exception("blah blah");
                    Console.WriteLine(x);
                }), Keep.Both).Run(mat);

                var queueSource = tup.Item1;
                var sinqCompletedTask = tup.Item2;
                sinqCompletedTask.ContinueWith(x =>
                {
                    x.Wait();
                    Console.WriteLine("sink completed");
                });

                queueSource.WatchCompletionAsync().ContinueWith(async x =>
                {
                    try
                    {
                        await x;
                        Console.WriteLine("Queue completed sucessfully");
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine("Queue Completed with exception: {0}", e.Message);

                    }
                });
                var result = queueSource.OfferAsync(1).Result;
                Console.WriteLine(result);
                result = queueSource.OfferAsync(2).Result;
                Console.WriteLine(result);
                Console.ReadLine();