Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • 14:45
    to11mtm closed #6269
  • 14:45
    to11mtm commented #6269
  • Nov 26 22:13
    to11mtm commented #6269
  • Nov 26 21:58
    to11mtm edited #6269
  • Nov 26 21:55
    to11mtm labeled #6269
  • Nov 26 21:55
    to11mtm opened #6269
  • Nov 26 21:55
    to11mtm labeled #6269
  • Nov 26 20:43
    to11mtm review_requested #6268
  • Nov 26 20:43
    to11mtm review_requested #6268
  • Nov 26 20:43
    to11mtm review_requested #6268
  • Nov 26 20:32
    to11mtm opened #6268
  • Nov 26 06:23
    Nican edited #6265
  • Nov 26 05:36
    Nican edited #6265
  • Nov 26 05:36
    Nican opened #6265
  • Nov 25 00:05
    to11mtm commented #6195
  • Nov 25 00:04
    to11mtm commented #6195
  • Nov 25 00:04
    to11mtm commented #6195
  • Nov 24 20:17
    Aaronontheweb labeled #6262
  • Nov 24 20:16
    Aaronontheweb edited #6262
  • Nov 24 20:16
    Aaronontheweb synchronize #6262
saepul ramdani
@msaepulr_twitter
@Aaronontheweb my question is actually: can we modify or add some process when spawning actor? (e.g set their property)
AndreSteenbergen
@AndreSteenbergen
@Horusiath @alexvaluyskiy @bobanco @marcpiechura What are your thoughts on AkkaNetContrib/Alpakka#39
Ebere Abanonu
@eaba
@Aaronontheweb @Horusiath how can we better handle skipped ordering number in eventjournal?
I have had issues with this kinds:1,2,3,4,5,109,110
once this happens no Actor gets any messages except only if the tables are dropped and recreated
Bartosz Sypytkowski
@Horusiath
@eaba if something like this happens it's a faulty behavior
@AndreSteenbergen I'll try to respond after work
Ebere Abanonu
@eaba
@Horusiath sure, how can this faulty behavior be handled? It is a serious issue if that happens in production, users will wait endlessly for response
Shukhrat Nekbaev
@snekbaev
Question about RunTask(). Is it safe to assume that it is a wrapper for calling async operations from UntypedActor or regular Receive methods? I mean, will it block the thread internally until result is received? In comparison to async/await which will let the thread go
Shukhrat Nekbaev
@snekbaev
And are there downsides to using RunTask vs PipeTo, former seems to simplify the code a bit?
For example, is it safe to modify actor's internal state from the RunTask, i.e. possible race conditions:
private void HandleRunTask( RunTask runTask)
{
    _counter++;

    Console.WriteLine( "Before RunTask" );

    RunTask
        (
         async () =>
         {
             _counter++;

             Console.WriteLine( $"Counter: {_counter}" );

             Console.WriteLine( "Before await" );

             await Task.Delay( 5000 ).ConfigureAwait( false );

             Console.WriteLine( "After await" );
         } );

    Console.WriteLine( "After RunTask" );
}
Shukhrat Nekbaev
@snekbaev
at least sending two messages to the actor above seem to indicate that they are still processed sequentially
Vasily Kirichenko
@vasily-kirichenko
@snekbaev
public static void RunTask(Func<Task> asyncAction)
        {
            var context = ActorCell.Current;

            if (context == null)
                throw new InvalidOperationException("RunTask must be called from an actor context.");

            var dispatcher = context.Dispatcher;

            //suspend the mailbox
            dispatcher.Suspend(context);

            ActorTaskScheduler actorScheduler = context.TaskScheduler;
            actorScheduler.CurrentMessage = context.CurrentMessage;

            Task<Task>.Factory.StartNew(asyncAction, CancellationToken.None, TaskCreationOptions.None, actorScheduler)
                              .Unwrap()
                              .ContinueWith(parent =>
                              {
                                  Exception exception = GetTaskException(parent);

                                  if (exception == null)
                                  {
                                      dispatcher.Resume(context);

                                      context.CheckReceiveTimeout();
                                  }
                                  else
                                  {
                                      context.Self.AsInstanceOf<IInternalActorRef>().SendSystemMessage(new ActorTaskSchedulerMessage(exception, actorScheduler.CurrentMessage));
                                  }
                                  //clear the current message field of the scheduler
                                  actorScheduler.CurrentMessage = null;
                              }, actorScheduler);
        }
so the lambda (task) is executed on the actor scheduler
I think it's safe to modify the actor state.
public override int MaximumConcurrencyLevel
        {
            get { return 1; }
        }
Shukhrat Nekbaev
@snekbaev
@vasily-kirichenko thank you!
Shukhrat Nekbaev
@snekbaev
Question about ConsistentHashingPool. Eventual goal is to have one actor per IConsistentHashable, no reuse. It seems that ConsistentHashingPool requires instance count, which means it will be reused, which in turn means I can't have state in the actor assuming it is per, say UserId. In that case I think I'm better off by manually creating actors for each UserId and store those in Dictionary or so. Or is it possible to actually tell ConsisntentHashingPool to have one actor per UserId? I did see some Resizer option, wasn't sure it was that?
Shukhrat Nekbaev
@snekbaev

A follow up would be:

public sealed class CHRChildActor : UntypedActor
{
    private int? processedUserId;
    private static int instanceCounter = 0;

    protected override void PreStart()
    {
        Console.WriteLine( Self.Path );

        base.PreStart();

        instanceCounter++;
    }

    protected override void OnReceive( object message )
    {
        message.Match().With<Work>( HandleWork )
               .Default( Unhandled );
    }

    private void HandleWork(Work work)
    {
        Console.WriteLine( $"Work received for userId {work.UserId}" );
        Console.WriteLine( $"Instance counter: {instanceCounter}" );

        if (processedUserId == null)
        {
            processedUserId = work.UserId;
        }

        if (processedUserId != work.UserId)
        {
            throw new Exception( "Being reused!" );
        }
    }

    public sealed class Work : IConsistentHashable
    {
        public Work(int userId)
        {
            UserId = userId;
        }

        public int UserId { get; }
        public object ConsistentHashKey => UserId;
    }
}

basically it throws exception if being reused, now, parent sends 5 messages

for (var i = 0; i < 5; i++)
{
    _child.Tell( new CHRChildActor.Work( i ) );
}

and the way _child actor is created:

_child = Context.ActorOf( Props.Create( () => new CHRChildActor()).WithRouter( new ConsistentHashingPool( 10 ) ) );
Vasily Kirichenko
@vasily-kirichenko
what is instanceCounter value?
Ismael Hamed
@ismaelhamed
@snekbaev, AFAIK consistent hashing will give you a range of UserIds per hash
Shukhrat Nekbaev
@snekbaev

so, size is 10, but 5 messages are sent. I was expecting it not to collide, i.e. throw exception, but it does, then I tried also setting the name:

_child = Context.ActorOf( Props.Create( () => new CHRChildActor()).WithRouter( new ConsistentHashingPool( 10 ) ), "a" );

ran and got the exception, then I added one more letter to the name:

_child = Context.ActorOf( Props.Create( () => new CHRChildActor()).WithRouter( new ConsistentHashingPool( 10 ) ), "ab" );

Ran and... got no exception. I was expecting one, but got none. Rerun 10 times, no exception. If I change to "a" or remove the name - exception is there, if I have "ab" - no exception. I'm afraid to ask, but is there some weirdness going on in here or am I missing something?

Vasily Kirichenko
@vasily-kirichenko
turn on debug logs and look at what actors and when are created
Shukhrat Nekbaev
@snekbaev
@vasily-kirichenko instanceCounter is a poor man's way to count PreStart. Basically if actor is not restarted I know how many instances are there :)
Vasily Kirichenko
@vasily-kirichenko
I meant what is its value :)
Shukhrat Nekbaev
@snekbaev
the child actor above, it has Console.Log, I can see the names of actors when they are created, order of console output is out of order though
normal run:
akka://Test/user/$a/ab/$c
akka://Test/user/$a/ab/$b
akka://Test/user/$a/ab/$h
akka://Test/user/$a/ab/$i
akka://Test/user/$a/ab/$j
akka://Test/user/$a/ab/$f
akka://Test/user/$a/ab/$g
CHRParentActor - PreStart
akka://Test/user/$a/ab/$e
akka://Test/user/$a/ab/$a
akka://Test/user/$a/ab/$d
Work received for userId 0
Instance counter: 10
Work received for userId 3
Instance counter: 10
Work received for userId 1
Instance counter: 10
Work received for userId 2
Instance counter: 10
Work received for userId 4
Instance counter: 10
crash:
akka://Test/user/$a/a/$a
akka://Test/user/$a/a/$g
akka://Test/user/$a/a/$i
akka://Test/user/$a/a/$h
akka://Test/user/$a/a/$j
akka://Test/user/$a/a/$e
akka://Test/user/$a/a/$c
akka://Test/user/$a/a/$b
akka://Test/user/$a/a/$f
akka://Test/user/$a/a/$d
CHRParentActor - PreStart
Work received for userId 0
Instance counter: 10
Work received for userId 2
Instance counter: 10
Work received for userId 1
Instance counter: 10
Work received for userId 3
Instance counter: 10
Work received for userId 4
Instance counter: 10
akka://Test/user/$a/a/$e
akka://Test/user/$a/a/$d
akka://Test/user/$a/a/$a
akka://Test/user/$a/a/$h
akka://Test/user/$a/a/$i
akka://Test/user/$a/a/$b
akka://Test/user/$a/a/$c
akka://Test/user/$a/a/$f
akka://Test/user/$a/a/$j
akka://Test/user/$a/a/$g
[ERROR][23.7.2018 11:27:27][Thread 0005][akka://Test/user/$a/a] Being reused!
Cause: System.Exception: Being reused!
   at TestActors.ConsistentHashRouting.CHRChildActor.HandleWork(Work work) in c:\Workspace\c#\akka.net\TestActors\ConsistentHashRouting\CHRChildActor.cs:line 40
   at Akka.Case.With[TMessage](Action`1 action)
   at TestActors.ConsistentHashRouting.CHRChildActor.OnReceive(Object message) in 
REDACTED
Bartosz Sypytkowski
@Horusiath
@snekbaev you can always Context.Watch over your actors - if they are killed (i.e. as a result of not being albe to replay they journal) they will notify watchers with Terminated message. Btw. if this happens in production, it means that your event journal has been corrupted and can no longer serve as a reliable source of data - in that case waiting for the response is the least of your problem.
Shukhrat Nekbaev
@snekbaev
this is a test app I'm using to learn Akka :) and to test things faster :)

btw, I ran into some quirks with:

 message.Match().With<Work>( HandleCrash )
                   .Default( Unhandled );

the pattern matching API, I'd assume it is not meant to be used by end users? Yesterday, I was "watch"ing and waiting for "Terminated" message, but never got it, though was getting unhandled messages, when checked, it was some Akka.Case which wrapped the "Terminated" message. Because of that I reworked the message handler to switch

Shukhrat Nekbaev
@snekbaev
message.Match().With<SEChildChildActor.Crash>( HandleCrash )
       .Match().With<Terminated>(HandleTerminated)
       .Default( o =>
                 {
                     Console.WriteLine( o );
                     Unhandled(o);
                 } );
became:
switch (message)
{
    case SEChildChildActor.Crash crash:
        HandleCrash( crash );
        break;

    case Terminated terminated:
        HandleTerminated(terminated);
        break;

    default:
        Unhandled( message );
        break;
}
Vasily Kirichenko
@vasily-kirichenko
I believe ConsistentHashPool guarantees that messages with same key are routed to the same actor (but it does not guarantee that there is single actor per key). It seems it keeps a ring of actors and select one by index.
Shukhrat Nekbaev
@snekbaev
@vasily-kirichenko any ideas why not-naming/short-naming the pool affects that rotation?
Vasily Kirichenko
@vasily-kirichenko
no
Vasily Kirichenko
@vasily-kirichenko
so, I don't think the resizer can be used at all
because if nr of actors grows, messages may be routed to different actors
Shukhrat Nekbaev
@snekbaev
yeah, I'd probably go with a list/dictionary and create actors per "some id"
Vasily Kirichenko
@vasily-kirichenko
let me check some books on this pool
Shukhrat Nekbaev
@snekbaev
btw, what is the common practice for that: say I want actor per user, I can create actor for each user and prefix the actor name with userId. Then, if needed, I can use ActorSelection to talk to it directly. Another way, is to have a list in some UserManagerActor, which has userIds mapped to each IActorRef. But in that case to talk to a child UserActor I will need to route through the UserManagerActor
Vasily Kirichenko
@vasily-kirichenko
ActorSelection would be too slow I'm afraid
it should be avoided in most cases
Bartosz Sypytkowski
@Horusiath
@snekbaev if you're using clusters, akka cluster sharding gives you exactly that
Vasily Kirichenko
@vasily-kirichenko
yeah, but I'm afraid he does not use cluster.
Bartosz Sypytkowski
@Horusiath
then a simple user coordinator can work
Vasily Kirichenko
@vasily-kirichenko
which holds a userid -> actorref dictionary?
I mean using ActorSelection is not good for this