dependabot[bot] on nuget
Bump MicrosoftLibVersion from 5… (compare)
dependabot[bot] on nuget
Bump System.Configuration.Confi… (compare)
dependabot[bot] on nuget
Bump Microsoft.Extensions.Hosti… (compare)
dependabot[bot] on nuget
Bump Microsoft.Extensions.Hosti… (compare)
dependabot[bot] on nuget
Bump BenchmarkDotNet from 0.13.… (compare)
Hi, question about actor discovery for local actor system (no remoting).
Common service actors are located in a single ServicesActor:
/user/ServicesActor/Service1
/user/ServicesActor/Service2
/user/ServicesActor/ServiceN
Some logic per instance in a single SomeManager:
/user/SomeManager/Instance1/SomeOrchestrator/Worker1
/user/SomeManager/Instance1/SomeOrchestrator/Worker2
/user/SomeManager/Instance1/SomeOrchestrator/WorkerN
...
/user/SomeManager/InstanceN/SomeOrchestrator/Worker1..N
I need to send messages from "WorkerX" actors to Service1... ServiceN actors. What is the best way to resolve service actors from worker actors?
One solution is to use "ActorPaths" class recommendation, but in this case I'm ending up with ActorSelection in-game, which is not recommended (https://getakka.net/articles/actors/untyped-actor-api.html) for local actor system, though definitely would simplify the discovery. Second option: pass ServiceActor IActorRef to SomeManager, send a message to ServiceActor something like "RequestAllServiceRefs" which will reply with a DTO that contains all actorrefs for all services, then pass that reply into constructor of "InstanceX" actor which in turn will pass it into constructor of "SomeOrchestrator" actor which will pass it to all child "WorkerX" actors. Quite verbose and complicates the initialization of the SomeManager (can't create child actors in PreStart, because reply is not received yet).
What is the best way to achieve this?
IStash
is settable
private void HandleRunTask( RunTask runTask)
{
_counter++;
Console.WriteLine( "Before RunTask" );
RunTask
(
async () =>
{
_counter++;
Console.WriteLine( $"Counter: {_counter}" );
Console.WriteLine( "Before await" );
await Task.Delay( 5000 ).ConfigureAwait( false );
Console.WriteLine( "After await" );
} );
Console.WriteLine( "After RunTask" );
}
public static void RunTask(Func<Task> asyncAction)
{
var context = ActorCell.Current;
if (context == null)
throw new InvalidOperationException("RunTask must be called from an actor context.");
var dispatcher = context.Dispatcher;
//suspend the mailbox
dispatcher.Suspend(context);
ActorTaskScheduler actorScheduler = context.TaskScheduler;
actorScheduler.CurrentMessage = context.CurrentMessage;
Task<Task>.Factory.StartNew(asyncAction, CancellationToken.None, TaskCreationOptions.None, actorScheduler)
.Unwrap()
.ContinueWith(parent =>
{
Exception exception = GetTaskException(parent);
if (exception == null)
{
dispatcher.Resume(context);
context.CheckReceiveTimeout();
}
else
{
context.Self.AsInstanceOf<IInternalActorRef>().SendSystemMessage(new ActorTaskSchedulerMessage(exception, actorScheduler.CurrentMessage));
}
//clear the current message field of the scheduler
actorScheduler.CurrentMessage = null;
}, actorScheduler);
}
public override int MaximumConcurrencyLevel
{
get { return 1; }
}
A follow up would be:
public sealed class CHRChildActor : UntypedActor
{
private int? processedUserId;
private static int instanceCounter = 0;
protected override void PreStart()
{
Console.WriteLine( Self.Path );
base.PreStart();
instanceCounter++;
}
protected override void OnReceive( object message )
{
message.Match().With<Work>( HandleWork )
.Default( Unhandled );
}
private void HandleWork(Work work)
{
Console.WriteLine( $"Work received for userId {work.UserId}" );
Console.WriteLine( $"Instance counter: {instanceCounter}" );
if (processedUserId == null)
{
processedUserId = work.UserId;
}
if (processedUserId != work.UserId)
{
throw new Exception( "Being reused!" );
}
}
public sealed class Work : IConsistentHashable
{
public Work(int userId)
{
UserId = userId;
}
public int UserId { get; }
public object ConsistentHashKey => UserId;
}
}
basically it throws exception if being reused, now, parent sends 5 messages
for (var i = 0; i < 5; i++)
{
_child.Tell( new CHRChildActor.Work( i ) );
}
and the way _child actor is created:
_child = Context.ActorOf( Props.Create( () => new CHRChildActor()).WithRouter( new ConsistentHashingPool( 10 ) ) );
so, size is 10, but 5 messages are sent. I was expecting it not to collide, i.e. throw exception, but it does, then I tried also setting the name:
_child = Context.ActorOf( Props.Create( () => new CHRChildActor()).WithRouter( new ConsistentHashingPool( 10 ) ), "a" );
ran and got the exception, then I added one more letter to the name:
_child = Context.ActorOf( Props.Create( () => new CHRChildActor()).WithRouter( new ConsistentHashingPool( 10 ) ), "ab" );
Ran and... got no exception. I was expecting one, but got none. Rerun 10 times, no exception. If I change to "a" or remove the name - exception is there, if I have "ab" - no exception. I'm afraid to ask, but is there some weirdness going on in here or am I missing something?