dependabot[bot] on nuget
Bump MicrosoftLibVersion from 5… (compare)
dependabot[bot] on nuget
Bump System.Configuration.Confi… (compare)
dependabot[bot] on nuget
Bump Microsoft.Extensions.Hosti… (compare)
dependabot[bot] on nuget
Bump Microsoft.Extensions.Hosti… (compare)
dependabot[bot] on nuget
Bump BenchmarkDotNet from 0.13.… (compare)
IStash
is settable
private void HandleRunTask( RunTask runTask)
{
_counter++;
Console.WriteLine( "Before RunTask" );
RunTask
(
async () =>
{
_counter++;
Console.WriteLine( $"Counter: {_counter}" );
Console.WriteLine( "Before await" );
await Task.Delay( 5000 ).ConfigureAwait( false );
Console.WriteLine( "After await" );
} );
Console.WriteLine( "After RunTask" );
}
public static void RunTask(Func<Task> asyncAction)
{
var context = ActorCell.Current;
if (context == null)
throw new InvalidOperationException("RunTask must be called from an actor context.");
var dispatcher = context.Dispatcher;
//suspend the mailbox
dispatcher.Suspend(context);
ActorTaskScheduler actorScheduler = context.TaskScheduler;
actorScheduler.CurrentMessage = context.CurrentMessage;
Task<Task>.Factory.StartNew(asyncAction, CancellationToken.None, TaskCreationOptions.None, actorScheduler)
.Unwrap()
.ContinueWith(parent =>
{
Exception exception = GetTaskException(parent);
if (exception == null)
{
dispatcher.Resume(context);
context.CheckReceiveTimeout();
}
else
{
context.Self.AsInstanceOf<IInternalActorRef>().SendSystemMessage(new ActorTaskSchedulerMessage(exception, actorScheduler.CurrentMessage));
}
//clear the current message field of the scheduler
actorScheduler.CurrentMessage = null;
}, actorScheduler);
}
public override int MaximumConcurrencyLevel
{
get { return 1; }
}
A follow up would be:
public sealed class CHRChildActor : UntypedActor
{
private int? processedUserId;
private static int instanceCounter = 0;
protected override void PreStart()
{
Console.WriteLine( Self.Path );
base.PreStart();
instanceCounter++;
}
protected override void OnReceive( object message )
{
message.Match().With<Work>( HandleWork )
.Default( Unhandled );
}
private void HandleWork(Work work)
{
Console.WriteLine( $"Work received for userId {work.UserId}" );
Console.WriteLine( $"Instance counter: {instanceCounter}" );
if (processedUserId == null)
{
processedUserId = work.UserId;
}
if (processedUserId != work.UserId)
{
throw new Exception( "Being reused!" );
}
}
public sealed class Work : IConsistentHashable
{
public Work(int userId)
{
UserId = userId;
}
public int UserId { get; }
public object ConsistentHashKey => UserId;
}
}
basically it throws exception if being reused, now, parent sends 5 messages
for (var i = 0; i < 5; i++)
{
_child.Tell( new CHRChildActor.Work( i ) );
}
and the way _child actor is created:
_child = Context.ActorOf( Props.Create( () => new CHRChildActor()).WithRouter( new ConsistentHashingPool( 10 ) ) );
so, size is 10, but 5 messages are sent. I was expecting it not to collide, i.e. throw exception, but it does, then I tried also setting the name:
_child = Context.ActorOf( Props.Create( () => new CHRChildActor()).WithRouter( new ConsistentHashingPool( 10 ) ), "a" );
ran and got the exception, then I added one more letter to the name:
_child = Context.ActorOf( Props.Create( () => new CHRChildActor()).WithRouter( new ConsistentHashingPool( 10 ) ), "ab" );
Ran and... got no exception. I was expecting one, but got none. Rerun 10 times, no exception. If I change to "a" or remove the name - exception is there, if I have "ab" - no exception. I'm afraid to ask, but is there some weirdness going on in here or am I missing something?
akka://Test/user/$a/ab/$c
akka://Test/user/$a/ab/$b
akka://Test/user/$a/ab/$h
akka://Test/user/$a/ab/$i
akka://Test/user/$a/ab/$j
akka://Test/user/$a/ab/$f
akka://Test/user/$a/ab/$g
CHRParentActor - PreStart
akka://Test/user/$a/ab/$e
akka://Test/user/$a/ab/$a
akka://Test/user/$a/ab/$d
Work received for userId 0
Instance counter: 10
Work received for userId 3
Instance counter: 10
Work received for userId 1
Instance counter: 10
Work received for userId 2
Instance counter: 10
Work received for userId 4
Instance counter: 10
akka://Test/user/$a/a/$a
akka://Test/user/$a/a/$g
akka://Test/user/$a/a/$i
akka://Test/user/$a/a/$h
akka://Test/user/$a/a/$j
akka://Test/user/$a/a/$e
akka://Test/user/$a/a/$c
akka://Test/user/$a/a/$b
akka://Test/user/$a/a/$f
akka://Test/user/$a/a/$d
CHRParentActor - PreStart
Work received for userId 0
Instance counter: 10
Work received for userId 2
Instance counter: 10
Work received for userId 1
Instance counter: 10
Work received for userId 3
Instance counter: 10
Work received for userId 4
Instance counter: 10
akka://Test/user/$a/a/$e
akka://Test/user/$a/a/$d
akka://Test/user/$a/a/$a
akka://Test/user/$a/a/$h
akka://Test/user/$a/a/$i
akka://Test/user/$a/a/$b
akka://Test/user/$a/a/$c
akka://Test/user/$a/a/$f
akka://Test/user/$a/a/$j
akka://Test/user/$a/a/$g
[ERROR][23.7.2018 11:27:27][Thread 0005][akka://Test/user/$a/a] Being reused!
Cause: System.Exception: Being reused!
at TestActors.ConsistentHashRouting.CHRChildActor.HandleWork(Work work) in c:\Workspace\c#\akka.net\TestActors\ConsistentHashRouting\CHRChildActor.cs:line 40
at Akka.Case.With[TMessage](Action`1 action)
at TestActors.ConsistentHashRouting.CHRChildActor.OnReceive(Object message) in
REDACTED
Context.Watch
over your actors - if they are killed (i.e. as a result of not being albe to replay they journal) they will notify watchers with Terminated
message. Btw. if this happens in production, it means that your event journal has been corrupted and can no longer serve as a reliable source of data - in that case waiting for the response is the least of your problem.
btw, I ran into some quirks with:
message.Match().With<Work>( HandleCrash )
.Default( Unhandled );
the pattern matching API, I'd assume it is not meant to be used by end users? Yesterday, I was "watch"ing and waiting for "Terminated" message, but never got it, though was getting unhandled messages, when checked, it was some Akka.Case which wrapped the "Terminated" message. Because of that I reworked the message handler to switch
message.Match().With<SEChildChildActor.Crash>( HandleCrash )
.Match().With<Terminated>(HandleTerminated)
.Default( o =>
{
Console.WriteLine( o );
Unhandled(o);
} );
became:switch (message)
{
case SEChildChildActor.Crash crash:
HandleCrash( crash );
break;
case Terminated terminated:
HandleTerminated(terminated);
break;
default:
Unhandled( message );
break;
}