These are chat archives for akkadotnet/akka.net

11th
Sep 2016
Vagif Abilov
@object
Sep 11 2016 08:05
@bobanco What if the message consumed from the queue needs to be handled by a chain of actors, and the last one has to ack it on success, while in case of failure some intermediate actor may nack it. Can such scenario be implemented?
Boban
@bobanco
Sep 11 2016 11:55
@object yes, also you can use async inside of the message handler
Boban
@bobanco
Sep 11 2016 12:02
also if there is an exception, you can provide what kind of strategy you want to be applied, by default it will retry the same message 5 times with the specified timeouts, and also u can apply what strategy u want when max retry is reached, by default it's drop strategy. u can create your own strategy instead of drop and log the message/exception etc, or even put to another queue.
also more important is that you don't care about creating channels/connections, declaring queues/exchanges, auto reconnection etc
the library is doing all of that work, you just simply need to provide info like what kind of exchange you will use, what's the queue name and how u will handle the message
Vagif Abilov
@object
Sep 11 2016 12:59
@bobanco this sounds very interesting, would very much like to look at it. Especially interested in examples of applying it to actor chains like I mentioned above. E.g. an actor A1 consumes a message from the queue, then it goes via the change A1 -> A2 - > ... -> An, and it's only An that acks the message, but either of actors can nack it.
Boban
@bobanco
Sep 11 2016 14:31
you can apply to actour chains but you will have to use the async handler, and also inside of the handler u would await for the response of your chains, if the response is negative u can nack it or if positive ack, it depends on your implementation, also u might want to use the streams instead of expressive message handlers, but like i said before i haven't used them so much, as it's hard to know at which stage the processing fails so imagine if you have in a middle of the procesing stage calling third party service the control of resuming at that stage it's pretty hard, like the author of the library said in the specification, in such case each processing stage should use CQRS and persist it's events so it can resume later from that place
but i would say also that i do not have extensive knoweladge abt the streams, mb there is some other way to handle that
@Silv3rcircl3 and @Horusiath might have better suggestions regarding the streams
Vagif Abilov
@object
Sep 11 2016 16:17
Thanks @bobanco
Although I am not sure that awating in actor chain conforms the actor model philosophy.
Perhaps streams can help with this.
Bartosz Sypytkowski
@Horusiath
Sep 11 2016 16:48
@object @bobanco it looks like op-rabbit on akka JVM has a streams support
Boban
@bobanco
Sep 11 2016 21:57

@Horusiath yes it does, but like i have said before i haven't used the streams addon, as i remeber at that time when i was porting the library to .net, there was no way to use the recovery strategy with the stream, also there is a blog post from the author The Need for Acknowledgement in Streams and he has mentioned the following restrictions:

The provided Akka Stream Broadcast component is incompatible with message acknowledgement; combining an AckedSource with this component would be an error.
Combining an AckedSource to an Akka Flow or Sink is also an error.
In an Akka FlowGraph, when you combine a Source with a Flow, you get a FlowGraph.Implicits
PortOps. PortOps implements FlowOps and provides stream manipulation operations that are incompatible with an acknowledged Flow; using them is error-prone.

Yin Zhang
@melcloud
Sep 11 2016 22:11
Tip of the day, don't try to subscribe UnHandledMessage, and put a predicate on it...
 public class MessageLogger : ReceiveActor
    {
        private readonly ILoggingAdapter _logger;

        /// <summary>
        /// Construct a <see cref="MessageLogger"/>.
        /// </summary>
        /// <param name="loggerFactory">The logger factory.</param>
        public MessageLogger(Func<IActorContext, ILoggingAdapter> loggerFactory)
        {
            Condition.Requires(loggerFactory, nameof(loggerFactory)).IsNotNull();

            _logger = loggerFactory(Context);

            Receive();
        }

        /// <summary>
        /// Subscribe to the event stream for unhandled messages.
        /// </summary>
        protected override void PreStart()
        {
            Context.System.EventStream.Subscribe(this.Self, typeof(UnhandledMessage));

            base.PreStart();
        }

        /// <summary>
        /// Receive any <see cref="IAmMessage"/> and log it.
        /// </summary>
        private void Receive()
        {
          // So horribly wrong here 
            Receive<UnhandledMessage>(
                msg => _logger.Warning("Message from '{Sender}' to '{Recipient}' is not handled: {Message}", msg.Sender, msg.Recipient, msg.Message),
                msg => msg.Message is IAmMessage);
        }
Arsene Tochemey GANDOTE
@Tochemey
Sep 11 2016 22:25
                  var contentStream = response.Content.ReadAsStreamAsync();
                    try
                    {
                        contentStream.Wait(TimeSpan.FromSeconds(1));
                        return new ImageDownloadResult(image, response.StatusCode, contentStream.Result);
                    }
                    catch //timeout exceptions!
                    {
                        return new ImageDownloadResult(image, HttpStatusCode.PartialContent);
                    }
Can I do this await response.Content.ReadAsStreamAsync? I am asking this because it is recommended not to use async await in Actor receive.
Yin Zhang
@melcloud
Sep 11 2016 22:29
You can do await, but await means suspend the mailbox, which may have a performance impact depends on your requirement
Arsene Tochemey GANDOTE
@Tochemey
Sep 11 2016 22:31
So what is the difference between the await and what was suggested on the github?
because We still suspend the mailbox in the code above. Maybe I am not getting it very well.
Yin Zhang
@melcloud
Sep 11 2016 22:33
@Tochemey If you are not using await, I would suggest you to put the ImageDownloadResult into a continueWith of ReadAsStreamAsync() then pipeTo the destination actor (e.g. Self)
@Tochemey E.g.
var contentStream = response.Content.ReadAsStreamAsync().ContinueWith(t => 
{
    if (t.IsCancelled || t.IsFault)
    {
         return new ImageDownloadResult(image, HttpStatusCode.PartialContent);
    }

          return new ImageDownloadResult(image, response.StatusCode, t.Result);
});
Arsene Tochemey GANDOTE
@Tochemey
Sep 11 2016 22:38
good
This message was deleted
tx
Corneliu
@corneliutusnea
Sep 11 2016 22:47
Guys, can Akka cluster nodes talk to each other VIA the lighthouse? e.g. If I have connectivity from C1>LH and C2>LH but not C1>C2 ?
Yin Zhang
@melcloud
Sep 11 2016 22:56
@corneliutusnea I think you need to customise Lighthouse a little bit, as long as lighthouse can inform C2 where the ClusterReceipientEndpoint is. See akka cluster client doc, http://getakka.net/docs/clustering/cluster-client#akka-cluster-client and http://letitcrash.com/post/56134720289/22-spotlight-cluster-client
Corneliu
@corneliutusnea
Sep 11 2016 23:47
@melcloud Docs say this Also don't use cluster client to communicate between actors inside the same cluster, as it's not a suitable tool for this job :( I want to use it to communicate between clients in the same cluster