These are chat archives for akkadotnet/akka.net

11th
Feb 2016
Hyungho Ko
@hhko
Feb 11 2016 01:02
@Horusiath thank you very much for your time and consideration.
Jaben Cargman
@Jaben
Feb 11 2016 02:40
Quick question: how to do I force a reasssocation or at least an attempt with remoting? If it disassociates for some reason, I'd like it to attempt to reconnect after, say 30 seconds .
I see, supervisor strategy. Nevermind.
Jaben Cargman
@Jaben
Feb 11 2016 02:52
Added Context.Watch(Self) on my Actor and it reconnects.
Stuart Cam
@codebrain
Feb 11 2016 03:40
Afternoon gents
    public class AskRoutingActor<T> : ReceiveActor
    {
        public AskRoutingActor(Func<T, string> actorName, Props props)
        {
            Receive<T>(async message =>
            {
                var actorPath = actorName(message);
                var actor = await Context.FindOrCreate(actorPath, props);
                actor.Tell(message);
            });
        }
    }
    public class DictionaryRoutingActor<T> : ReceiveActor
    {
        private readonly Dictionary<string, IActorRef> _routes = new Dictionary<string, IActorRef>();

        public DictionaryRoutingActor(Func<T, string> actorName, Props props)
        {
            Receive<T>(message =>
            {
                var actorPath = actorName(message);
                if (_routes.ContainsKey(actorPath))
                {
                    _routes[actorPath].Tell(message);
                    return;
                }

                var actor = Context.ActorOf(props, actorPath);
                actor.Tell(message);
                _routes.Add(actorPath, actor);
            });
        }
    }
Asking vs. maintaining a dictionary... pros and cons?
    public static class ActorExtensions
    {
        public static async Task<IActorRef> FindOrCreate(this IUntypedActorContext context, string actorPath, Props props)
        {
            var actorIdentity = await context.ActorSelection(actorPath).Ask<ActorIdentity>(new Identify(null), TimeSpan.FromSeconds(10));
            var actor = actorIdentity?.Subject ?? context.ActorOf(props, actorPath);
            return actor;
        }
    }
Bartosz Sypytkowski
@Horusiath
Feb 11 2016 06:38
@codebrain the only problem I see there, is that you're recreating more expensive version of something that akka gives your for free ;)
// find or create actor by name
var child = Context.Child(actorName);
if(Equals(child, ActorRefs.Nobody)) {
    child = Context.ActorOf(props, actorName);
}
Zetanova
@Zetanova
Feb 11 2016 09:32
make it sens to host WorkflowFramework in akka ? Would need a good Statemachine for long term processes like Order/Transfers
Bartosz Sypytkowski
@Horusiath
Feb 11 2016 09:34
@Zetanova you mean building workflow framework in akka?
Zetanova
@Zetanova
Feb 11 2016 09:49
@Horusiath To create a PR thats saves and manage WF4.5 Applications instead to code it into the actor itself
The main difference is, that the WF has allready good diagrams and versioning. If i would build it directly in akka, only developers would understand it.
Bartosz Sypytkowski
@Horusiath
Feb 11 2016 09:52
my guess is that you won't get akka performance using WWF, but integration plugin between these two is always something good to have
I never believed in click-it-yourself frameworks for non-developers ;)
Zetanova
@Zetanova
Feb 11 2016 10:09
perf. is not required. the WWF has Activities, i can develop and make a Workflow out of it (by code or designer). It will still function for new versions and is very isolated. Maybe i make something wrong, but a long running ProcessManager in akka looks for me very fragil and complicated. AR with CQRS/ES is a different story. What i need currently is just a "Transfer" to be created and it can be approved later. It will create then all required accounts and transactions. In the Order/Purchase category it's the same thing. maybe i am missing some scheduler feature (index of the processes like in WWF), to handle long running processes better.
Arjen Smits
@Danthar
Feb 11 2016 10:20
@Zetanova sounds like you want to build sagas
Or maybe a routing slip
But if you want to build a user-configurable ProcessManager then WWF sounds like a good fit. Don't envy you for having to shoehorne that in your infrastructure though.
Zetanova
@Zetanova
Feb 11 2016 10:23
yes, i just missing some features in akka, like managers for idle/resume, versioning and tracking. Of course i could implement them by myself ... its getting a little bit to much
Arjen Smits
@Danthar
Feb 11 2016 10:25
Some examples for persistent saga's in akka would be nice yup. But i wouldn't go as far as to say its a missing feature in akka.
You could build something like that with akka. It wouldn't be that hard even.
Its just that if you include versioning it gets a lot harder.
Iv'e been flirting with Automatonymous from the masstransit guys.
Seems like it would be worthwhile building something with akka that integrates with that. But havent taken a deeper look at it.
John Nicholas
@MrTortoise
Feb 11 2016 10:31
@Horusiath lo is there anything like the AroundRecieve on the sending side of things? Im thinking of using aroundRecieve to unwrap some kind of generic wrapper ... but would be nice to have a aroundTell to automatically wrap stuff
Zetanova
@Zetanova
Feb 11 2016 10:31
thx, I will look at it. May bigest problem is that I need to get finaly to the BLL and stay there, but somehow there are appiering new technologies from each other and after another.
Bartosz Sypytkowski
@Horusiath
Feb 11 2016 10:32
@Zetanova if I recall, I've gave you a link about sagas in akka on the JVM like week or two ago
@MrTortoise how would you override those? Tell is interface method, AroundReceive is virtual implementation
you'd need to create another wrapper on IActorRef, resulting in additional layer of indirection - I'm not saying it's always bad, but you need a good reason to do so
John Nicholas
@MrTortoise
Feb 11 2016 10:35
@Horusiath well i have no idea ;) but was wondering if some kind of hook was already there before i went down a rabbit hole
@Horusiath the immediate application that came to my mind was in effect a poor mans authentication in some kind of envelope so the sender and reciever don't need to constantly handle it in every recieve and tell
Bartosz Sypytkowski
@Horusiath
Feb 11 2016 10:36

@MrTortoise from yesterday:

you can make your own - in some cases you need to wrap Tell with custom logic. This is how Deliver method works in at-least-once-delivery semantic
it's basically public void Post(IActorRef aref, object message) => aref.Tell(new CorrelatedWrapper(CurrentCorrelationId, message));

Arjen Smits
@Danthar
Feb 11 2016 10:36
@Horusiath the JVM has an persistent saga's example ?
John Nicholas
@MrTortoise
Feb 11 2016 10:36
cheers ;) i think thats what i was looking for. Should of remembered that myself from playing with at least once tbh!
Zetanova
@Zetanova
Feb 11 2016 10:38
@Horusiath Yes u did, i forgot about it a bit after my angularJS UI excurse
Bartosz Sypytkowski
@Horusiath
Feb 11 2016 10:38
@Danthar probably more than one
Arjen Smits
@Danthar
Feb 11 2016 10:40
nice. Didn't knew that
alexgoeman
@alexgoeman
Feb 11 2016 10:52
Question related to persistence being in beta. Does it mean that it is not recommended to use it yet in production ? If not recommended can I somewhere lookup what the current issues are ? Or planned changes ? Purely out of interest, Persistence looks to me "very" important, how come that persistence not yet has come out of beta (or am I wrong) ? Is this a strategic choice or a resource issue ? If not recommended to use what is then current best practice todo persistence ( e.g. just classical save and load from database) ?
Zetanova
@Zetanova
Feb 11 2016 11:05
@alexgoeman the main feature of akka persistence is the support for Eventsourcing. It currently is working well. But over the past versions there was some bugs (including myself) where i needed to drop the Event Journal
Michel van den Berg
@promontis
Feb 11 2016 11:08
@Zetanova did you loose data because of that
Bartosz Sypytkowski
@Horusiath
Feb 11 2016 11:09
@alexgoeman right now Akka.Persistence is quite stable, but still needs to be upgraded to keep in line with it's JVM counterpart - I guess, that there will be some changes, but from current perspective nothing such heavy, that it could break existing journals backends.
journal implementations will need to be updated as well
voltcode
@voltcode
Feb 11 2016 12:21
@Horusiath what are the reasons? were there serious bugs in scala version that made their way to .NET ?
Alexis Narváez
@Narvalex
Feb 11 2016 12:56
Just a quick question. Is the Event Journal in Akka something like an Event Store from Greg Young?
alexgoeman
@alexgoeman
Feb 11 2016 12:58
Ok Thanks , so I conclude it is not unresponsible to use it in production. What do you think is currently mostly used for pesersistence in production akka.net apps ? Classic database or eventsourcing ? Both seem reasonable choices, but somehow I seem to tend to event sourcing because of its advantages and nice fit with Actor, but lack of experience with eventsourcing makes it difficult to decide :-)
Bartosz Sypytkowski
@Horusiath
Feb 11 2016 13:33
@voltcode it's not strictly a bug. I would say, it's a continuous improvement - replacing old approaches with better ones, removing limiting features. If you don't want to end up with overblown API, at some point you need to decide what should be removed. I.e. obsoleting synchronous journals, as they have no real use case among existing implementations, or standarizing naming conventions (as some of the concepts from domain are unnecessarily represented by different words).
Bartosz Sypytkowski
@Horusiath
Feb 11 2016 13:39
@Narvalex event journal is an akka class used as adapter to store events in some kind of persistent backend, effectivelly abstracting this backend details from the rest of the system. EventStore is a full blown event sourcing engine and also persistent backend, so it's capabilities often overlaps with Akka.Persistence. However doesn't have notion of actors. Eventually you can use it with akka actors through persistence plugin which exposes EventStore backend as akka journal.
Bartosz Sypytkowski
@Horusiath
Feb 11 2016 13:51
@alexgoeman this really depends on the use case, for agent model Event Sourcing is good because it has good reliability/performance ratio. But if you don't need ES features, you can store your actors state as simple entities in data store. In this case the big question usually is "when to persist actor's state in data store" and there are multiple different answers:
  • If you store state after message, that altered it, you may loose some performance as storing whole object graph is usually heavier than a simple event. Also this is part of the CRUD-based data stores. Event Sourcing can take advantage of append-only stores to make some speedups.
  • You can store state after actor's PostStop event, and restore it on PreStart - but this may be risky in some scenarios, as you'll loose state when unexpected shutdown will occur i.e. process kill or getting whole VM down.
  • Other option is to make a scheduled save request triggered every X seconds - this way you can limit the risk of data loss while still having better performance than saving on each state change. Actually this was approach, @Aaronontheweb used in the past for his company inside his data analytic engine.
Alexis Narváez
@Narvalex
Feb 11 2016 14:10
@Horusiath Thanks for your answer. Regarding of your answer to @alexgoeman I want to add the fact that it is true that appending only events to the store is fast, but when you need to left-fold past events to get the current state It could take some time if the stream is big... The snapshotting after intervals technique seems to be good. In my project I save the events and the snapshot al after each transaction and also I keep the updated snapshot in memory to read from it faster than replaying past events. What do you think?
Zetanova
@Zetanova
Feb 11 2016 14:15
@promontis one thing went into an other, i extened an event (i am still in developement) and got serialization errors. thought it was the change in the event ... it was the newton-json 8.0.2 update.
Hyungho Ko
@hhko
Feb 11 2016 14:17
how to use akka.net with Reactive Extension?
Is it possible?
Is it good way to implement UI program?
if it isn't a good way, Can you recommend a alternative good way?
Zetanova
@Zetanova
Feb 11 2016 14:25
@hhko Yes i am doing it. It depands what you want to do. The most importand thing is that u stick to the same thread as the actor in the RX observer. A important feature of RX is that is by default NOT conuccrent, so stick to it and u will not have any problems with your actor-context
Hyungho Ko
@hhko
Feb 11 2016 14:30
@Zetanova Could you tech me a good examples for Rx with Akka.net together?
Zetanova
@Zetanova
Feb 11 2016 14:47
public class PNetObserverActor : ActorBase
    {
        readonly IObserver<Object> observer;

        public static Props Props(IObserver<Object> observer)
        {
            return Akka.Actor.Props.Create(() => new PNetObserverActor(observer))
                .WithDeploy(Deploy.Local);
        }

        public PNetObserverActor(IObserver<Object> observer)
        {
            this.observer = observer;
        }

        protected override bool Receive(object message)
        {
            message.Match()
                .With<PNetEvents.Completed>(OnCompleted)
                .With<PNetEvents.Failed>(OnError)
                .Default(m => observer.OnNext(m));

            return true;
        }

        protected bool Done(object message) { return false; }

        void OnError(PNetEvents.Failed msg)
        {
            Context.Become(Done);
            observer.OnError(msg.Cause);
        }

        void OnCompleted(PNetEvents.Completed msg)
        {
            Context.Become(Done);
            observer.OnCompleted();
        }
    }
public class PNetInbox : IObservable<Object>, IDisposable
    {
        private static int inboxNr = 0;

        readonly ActorSystem system;
        readonly IObservable<Object> source;
        //readonly TimeSpan defaultTimeout;

        public IActorRef Receiver { get; private set; }

        public static PNetInbox Create(ActorSystem system)
        {
            //var config = system.Settings.Config.GetConfig("akka").GetConfig("actor").GetConfig("inbox");
            //var inboxSize = config.GetInt("inbox-size");
            //var timeout = config.GetTimeSpan("default-timeout");

            var subject = new PNetQueueSubject<Object>(TaskPoolScheduler.Default);

            var receiver = system.ActorOf(PNetObserverActor.Props(subject), "pbox-" + Interlocked.Increment(ref inboxNr));

            var inbox = new PNetInbox(receiver, system, subject);
            return inbox;
        }

        private PNetInbox(IActorRef receiver, ActorSystem system, IObservable<Object> source)
        {
            //this.defaultTimeout = defaultTimeout;
            this.system = system;
            this.source = source;
            Receiver = receiver;
        }

        public void Send(ICanTell actorRef, object msg)
        {
            actorRef.Tell(msg, Receiver);
        }

        public IDisposable Subscribe(IObserver<object> observer)
        {
            return source
                .Timeout(TimeSpan.FromSeconds(30)) //test
                .Subscribe(observer); //maybe timeout
        }

        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        bool disposed;
        protected virtual void Dispose(bool disposing)
        {
            if (disposed)
                return;

            //Contract.Assert();

            if (disposing)
                system.Stop(Receiver);

            disposed = true;
        }

        ~PNetInbox()
        {
            Dispose(false);
        }
    }
simple and good
in actor processing u can use RX but post likly u can and should archive the same behaivoir
Zetanova
@Zetanova
Feb 11 2016 14:53
public IObservable<TResult> Call<TResult>(object message, CancellationToken cancellationToken = default(CancellationToken), IProgress<PNetExecutionProgress> progress = null)
        {
            return Observable.Using(() => PNetInbox.Create(System), inbox =>
            {
                if (cancellationToken.IsCancellationRequested)
                    return PNetObservable.Cancel<TResult>(cancellationToken);

                var source = Observable.Create<Object>(o => 
                {
                    IActorRef target = ActorRefs.Nobody;
                    //PNetExecutionProgress  p = null;
                    var cancel = Disposable.Empty;

                    return inbox.Subscribe(
                        onNext: n => 
                        {
                            n.Match()
                                .With<PNetEvents.Started>(m =>
                                {
                                    target = m.Source;
                                    if (cancellationToken != CancellationToken.None)
                                        cancel = cancellationToken.Register(() => inbox.Send(target, PNetCommands.Cancel.Instance));

                                    if (progress != null)
                                    {
                                        var p = new PNetExecutionProgress();
                                        progress.Report(p);
                                    }
                                })
                                .With<PNetEvents.Progressed>(m => 
                                {
                                    if (progress != null)
                                    {
                                        m.Match()
                                            .With<PNetExecutionProgress>(progress.Report);
                                        //todo maybe log unkown progress value
                                    }
                                })
                                .Default(m => o.OnNext(m));
                        }, 
                        onError: ex => 
                        {
                            //todo progress
                            target = ActorRefs.Nobody;
                            cancel.Dispose();
                            o.OnError(ex); 
                        }, 
                        onCompleted: () => 
                        {
                            //todo progress
                            target = ActorRefs.Nobody;
                            cancel.Dispose();
                            o.OnCompleted();
                        });
                });

                inbox.Send(Reference, message);

                return source.OfType<TResult>();
            });
@hhko see above
Dont have that much RX inside actors anymore, but this Inbox is very usefull
Hyungho Ko
@hhko
Feb 11 2016 15:20
@Zetanova thank you for your consideration and good code^^;
Bartosz Sypytkowski
@Horusiath
Feb 11 2016 15:59
@Narvalex interesting idea, however in case of strong failure (like process kill) you'll need to recover from all the events
same applies, if you'll move actor to another machine
nonetheless it's still an interesting approach
Alexis Narváez
@Narvalex
Feb 11 2016 16:51
@Horusiath Why would you need to recover from all events? If you store a snapshot after processing a message, and the system goes down, you loose just your snapshot in-memory. When restarting from cold you can just rehydrate your actor/aggregate from the snapshot. Events are infinitely cachable, no reason to look back (only if you want to change your snapshot definition)
Alexis Narváez
@Narvalex
Feb 11 2016 17:00
I wonder how big can a snapshot be... I have the feeling that if an actor/aggregate has a really big snapshot... maybe it is doing too much work. Maybe can we delegate some work to another actor that listens to him?
Zetanova
@Zetanova
Feb 11 2016 17:25
@Narvalex If you have a heavy object, u can still use some external resources, like a table or memorymapped file in the GB sizes and just save the ResourceIdentifier into the PersistenceStore/Snapshot
Zetanova
@Zetanova
Feb 11 2016 17:40
or of course seperate the data over multiple actors (if its possible with the data u are dealing with). You shouldnt fall to much into it, even if your snapshot is in the double diget MB range it would perform "ok"
Zetanova
@Zetanova
Feb 11 2016 18:05
@Horusiath Is in the SnapshotStore something like an outdated algo. ? If there is realy the case when the SnapshotStore couldn't save or serialize the snapshot before the next one is coming.
Bartosz Sypytkowski
@Horusiath
Feb 11 2016 18:20

you loose just your snapshot in-memory. When restarting from cold you can just rehydrate your actor/aggregate from the snapshot

So are you snapshotting both in memory and in persistent store on each state change?

@Zetanova in akka SnapshotStore is an actor and incoming snapshot save requests are messages, so they are processed in order they come from particular actor
Jordan S. Jones
@jordansjones
Feb 11 2016 19:32
Does anyone have an further documentation they can point me to in regard to the Json.Net serializer with Distributed publish/subscribe feature? Something more than what is on the website.
err.. The issues with the Json.Net serializer and the distributed publish/subscribe feature.
Bartosz Sypytkowski
@Horusiath
Feb 11 2016 19:40
@jordansjones you need to download Akka.Serialization.Wire package and bind it using HOCON config:
akka.actor.serializers.wire = ""Akka.Serialization.WireSerializer, Akka.Serialization.Wire""
akka.actor.serialization-bindings {
    ""System.Object"" = wire
}
Jordan S. Jones
@jordansjones
Feb 11 2016 19:41
@Horusiath Thanks. :) However, what I am looking for is what the issues are.
The documentation states that it is incompatible, but doesn’t mention what the incompatibilities are.
Bartosz Sypytkowski
@Horusiath
Feb 11 2016 19:44
tl;dr is that JSON.NET sucks ass and it's not able to correctly deserialize some of the structures used by distributed pub/sub
Jordan S. Jones
@jordansjones
Feb 11 2016 19:46
@Horusiath lol.. Thanks :)
Arjen Smits
@Danthar
Feb 11 2016 20:12
@Horusiath if we had a quote's-wall-of-fame. That one would go up there ^^
Stuart Cam
@codebrain
Feb 11 2016 22:27
@Horusiath - Thanks!