dependabot-preview[bot] on nuget
Bump Google.Protobuf from 3.15.… (compare)
var cluster = Cluster.Get(system);
cluster.Subscribe(refOfSomeActorYouMadeToListenToEvents,typeof(ClusterEvent.MemberDowned)); //may want others too
var props = actorRefFactory.DI().Props<NotificationCoordinatorActor>();
var supervisor = PersistenceSupervisor.PropsFor(
(msg, confId) =>
{
return msg switch
{
NotificationCoordinatorActor.Subscribe sub => new NotificationCoordinatorActor.ConfirmableSubscribe(confId, string.Empty, sub),
NotificationCoordinatorActor.Unsubscribe unsub => new NotificationCoordinatorActor.ConfirmableUnsubscribe(confId, string.Empty, unsub),
NotificationCoordinatorActor.Notify notify => new NotificationCoordinatorActor.ConfirmableNotify(confId, string.Empty, notify),
_ => new ConfirmableMessageEnvelope(confId, string.Empty, msg)
};
},
msg => true,
props,
"notification-coordinator-actor",
strategy: SupervisorStrategy.StoppingStrategy.WithMaxNrOfRetries(100));
ActorRef = actorRefFactory.SingletonActorOf(supervisor, "notification-coordinator-actor");
Hello everyone. Quick question on Akka.Streams. I wanted to test out a RestartFlow and made a simple example. Something along the lines:
using var system = ActorSystem.Create("system");
using var materializer = system.Materializer();
var source = Source.From(Enumerable.Range(1, 10));
await source .Via(RestartFlow.OnFailuresWithBackoff(
() => Flow.FromFunction<int, int>(i => i == 2 ? throw new Exception("boom") : i),
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(10),
0))
.RunForeach(i => Console.WriteLine(i), materializer);
The problem now is, that after the Exception was thrown the RestartFlow throws a "Cannot pull a closed port" Exception. Can someone explain to me why? Or if I miss something.
Thanks in advance
Hello!
Is it possible that the BackoffSupervisor parameter description is not correct?
The doc says:
The following C# snippet shows how to create a backoff supervisor which will start the given echo actor after it has stopped because of a failure, in increasing intervals of ... seconds
But in a scenario, where the child Actor has a delay, I run into a problem.
I am supervising a child with BackoffSupervisor.OnStop. The ChildActor tries to establish a connection. If it doesn't manage to do that in a certain timeout, then the actor is stopped.
Parentvar supervisor = BackoffSupervisor.Props(
Backoff.OnStop(
childProps,
childName: "myEcho",
minBackoff: TimeSpan.FromMilliSeconds(500),
maxBackoff: TimeSpan.FromSeconds(30),
randomFactor: 0.2,
maxNoOfRetries: 3));
ChildContext.System.Tcp().Tell(new Tcp.Connect(endpoint, null,null, TimeSpan.FromMilliseconds(1000)));
(Stops on TCP ErrorMessage Received)
I would interpret the doku that if the child stops, the next start will restart after the interval expires.
So in my case the sequence should look like this:
0ms: Initial start
1000ms Child timeout => Context.Stop(Self)
1500ms: first retry (+500ms)
2500ms: Child timeout => Context.Stop(Self)
3500ms: second retry (+1000ms)
4500ms: Child timeout => Context.Stop(Self)
6500ms: third retry (+2000ms)
7500ms: Terminate Child
Is this correct?
But If I set the minBackoff time in the BackoffSupervisor smaller than the timeout in the childactor, then the backoff does not work.
The child is restarted immediately and maxNrOfRetries is also not respected.
What am I doing wrong? What do I miss?
Regards
Michael
I found in our logs the following warning:
Configured Total of Connection timeout (15 seconds) and Command timeout (30 seconds) is less than or equal to Circuit breaker timeout (10 seconds). This may cause unintended write failures
I think two things need to be corrected here.
@Aaronontheweb This is a minor detail but can be confusing. If you agree with my interpretation, I can send a PR.
@object I peeked at some of your issues akkadotnet/akka.net#4265 and akkadotnet/Akka.Persistence.SqlServer#190
ThreadPool.GetAvailableThreads(out int avWt, out int avIot);
ThreadPool.GetMaxThreads(out int maxWt, out int maxIot);
var (workerThreads,ioThreads)= (maxWt - avWt,maxIot-avIot); //log this every second
If you can enable MVCC (i.e. Enable snapshot Isolation and default isolation level READ_COMMITTED_SNAPSHOT) that could help
Might be worth logging the SQL executed in the chunk
Mayyyybe try adding some code in the batching process so that if there's only READ commands in the batch, a transaction isn't used.
If you're feeling adventurous, you may want to try (it is technically prerelease state) the Linq2db plugin which is based on akka-persistence-jdbc. Code is the branch in akkadotnet/Akka.Persistence.Linq2Db#10 and I have it as a nuget package here. Configuring for 'compatibility mode' (Where it should be both backward and forward compatible with Existing Journal/snapshot) is exampled here. I've been running the journal in not-quite-prod (i.e. syncing data with old system) for a few months now with great success
TestScheduler
has to be manually advanced