Aaronontheweb on dev
Update cluster-overview.md (#47… (compare)
Is there anyone out there that can help me get my head around the implications that leveraging persistence in an existing actor. I've changed the base class and am moving some "commands" over to the construct events, persist, process event pattern but am getting what "feels" like asynchronous processing of the event processing. Is it true to say that:
1) A message is received
2) Message is treated as a command and an event is created from the command
3) Persist is called supplying the event to persist and a handler to be invoked after successful persistence
4) Next message is pulled from the mailbox and processed
5) Persistence completes
6) Handler supplied in 3 is invoked with persisted event
That's how it feels, so I wonder if that is accurate, and is the fix to complete the moving over from messages to Commands/Events and it'll be okay.
async
methods which return data (to an Ask
) and PipeTo
: would using ContinueWith
on faulted tasks be a reasonable way to 'catch' and handle exceptions thrown. Example: Receive<SomeRequest>(request =>
{
GetSomethingAsync(request)
.ContinueWith(t => new FailedResponse(t.Exception), TaskContinuationOptions.OnlyOnFaulted)
.PipeTo(Sender);
});
PipeTo
if the task succeeds normally I don't think
PipeTo
is continuing off of the OnlyOnFaulted
Task
// pipe the result of our markets to ourselves
_exchangeFeedClient.HttpClient.Markets.GetProducts(cts.Token).ContinueWith<object>(tr =>
{
if (tr.IsCanceled || tr.IsFaulted)
return new Status.Failure(tr.Exception);
return tr.Result;
}).PipeTo(Self);
Receive<Status.Failure>(s =>
{
_currentRetryPolicy = _currentRetryPolicy.Retry(); // update the retry data
_log.Warning(
"Request for list of exchange trade products timed out from {0}. Retrying in {1} seconds with {2} attempts remaining.",
_exchangeFeedClient.Endpoints.RestEndpointUri, _currentRetryPolicy.CurrentRetryInterval,
_currentRetryPolicy.RemainingAttempts);
AcquireProductsFeed(_currentRetryPolicy.CurrentRetryInterval);
}, failure => _currentRetryPolicy.CanRetry);
Receive<Status.Failure>(s =>
{
_log.Error("ERROR: unable to contact exchange at {0} after {1} attempts. Aborting.",
_exchangeFeed.HttpClient.Endpoints.RestEndpointUri, _retryPolicyProvider.MaxNrOfAttempts);
_currentRetryPolicy.Retry(); // throw a HopelessOperation exception on purpose
}, failure => !_currentRetryPolicy.CanRetry);
// Received data from the exchange API.
Receive<ApiResult<IEnumerable<Product>>>(products =>
{
if (products.Status == HttpStatusCode.OK)
{
_tradeProducts.AddRange(products.Result);
_log.Info("Received products from exchange: {0}", string.Join(",", _tradeProducts.Select(x => x.id)));
BecomePublishing();
}
else if (_currentRetryPolicy.CanRetry
) // something went wrong with our web request, but we have retry attempts remaining
{
_currentRetryPolicy = _currentRetryPolicy.Retry(); // update the retry data
_log.Warning(
"Received error with our HTTP request ({0}): {1}. Retrying in {2} with {3} attempts left.",
products.Status, products.Message,
_currentRetryPolicy.CurrentRetryInterval, _currentRetryPolicy.RemainingAttempts);
AcquireProductsFeed(_currentRetryPolicy.CurrentRetryInterval);
}
else // web request failed, no retry attempts remaining
{
_log.Error(
"ERROR: unable to contact exchange at {0} after {1} attempts. HTTP Status: {2}, Message: {3}. Aborting.",
_exchangeFeed.HttpClient.Endpoints.RestEndpointUri,
_retryPolicyProvider.MaxNrOfAttempts, products.Status, products.Message);
_currentRetryPolicy.Retry(); // throw a HopelessOperation exception on purpose
}
});
AcquireProductsFeed(_currentRetryPolicy.CurrentRetryInterval);
causes me to re-run the task that failed
Persist
method
PersistAsync
Hi all, trying to build client to remote akka system using association. Using test:
public class AssociationErrorTest
{
public class EchoActor : ReceiveActor
{
public EchoActor(){Receive<object>(m => Sender.Tell(m));}
}
[Fact]
public async Task Test_association()
{
var server = ActorSystem.Create("server", @"akka { actor.provider = ""Akka.Remote.RemoteActorRefProvider, Akka.Remote""
remote.dot-netty.tcp { port = 10001
hostname = localhost}}");
var serverActor = server.ActorOf<EchoActor>(nameof(EchoActor));
var client = ActorSystem.Create("client", @"akka{ actor.provider = ""Akka.Remote.RemoteActorRefProvider, Akka.Remote""
remote.dot-netty.tcp { port = 0
hostname = localhost}}");
var remoteActorRef = await client.ActorSelection(@"akka.tcp://server@localhost:10001/user/EchoActor").ResolveOne(TimeSpan.FromSeconds(5));
await remoteActorRef.Ask<string>("Hello");
}
}
if I run it as single test, all is OK . But if I run it with bunch of test, actor selection hangs. Got error from internals:
17-10-05 17:50:08.366 [DBG TH193] Src:[remoting
"Associated [akka.tcp://server@localhost:10001] <- akka.tcp://client@localhost:5703"
17-10-05 17:50:08.375 [DBG TH201] Src:[endpointWriter#116268940]
"Associated [akka.tcp://client@localhost:5703] -> akka.tcp://server@localhost:10001"
17-10-05 17:50:08.375 [DBG TH201] Src:[endpointWriter#116268940]
Drained buffer with maxWriteCount: 50, fullBackoffCount: 1,smallBackoffCount: 0, noBackoffCount: 0,adaptiveBackoff: 1000
17-10-05 17:50:08.388 [DBG TH201] Src:[client)
Resolve of path sequence [/"temp/N"] failed
looks like client system cannot locate temp actor during Ask()
Can anybody suggest reasons? May be I need some additional configuration ?