Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • 08:57
    michelvosje opened #6382
  • 07:55
    beachwalker commented #6378
  • 07:39
    beachwalker synchronize #6378
  • Feb 02 21:27
    Arkatufus synchronize #172
  • Feb 02 21:09
    CumpsD commented #172
  • Feb 02 17:01
    dependabot[bot] synchronize #6285
  • Feb 02 17:01
    dependabot[bot] synchronize #6240
  • Feb 02 17:01
    dependabot[bot] synchronize #6241
  • Feb 02 17:01

    dependabot[bot] on nuget

    Bump MicrosoftLibVersion from 5… (compare)

  • Feb 02 17:01
    dependabot[bot] synchronize #6237
  • Feb 02 17:01
    dependabot[bot] synchronize #6351
  • Feb 02 17:01

    dependabot[bot] on nuget

    Bump System.Configuration.Confi… (compare)

  • Feb 02 17:01

    dependabot[bot] on nuget

    Bump Microsoft.Extensions.Hosti… (compare)

  • Feb 02 17:01

    dependabot[bot] on nuget

    Bump Microsoft.Extensions.Hosti… (compare)

  • Feb 02 17:01

    dependabot[bot] on nuget

    Bump BenchmarkDotNet from 0.13.… (compare)

  • Feb 02 17:01
    dependabot[bot] edited #6285
  • Feb 02 17:01
    dependabot[bot] edited #6241
  • Feb 02 17:01
    dependabot[bot] edited #6240
  • Feb 02 17:01
    dependabot[bot] edited #6237
  • Feb 02 17:01
    dependabot[bot] edited #6351
Ian Patterson
@IanPattersonMuo
Has anyone any experience of using Protocol Buffers for Sharding ? and if so how did they structure their Envelope and MessageExtractor code ? My first pass attempt is to create an Envelope that takes Google.Protobuf.IMessage and then packs them into an Any field. This makes the MessageExtractor more tricky as the type has to be unpacked before being returned. Is there a supported way of doing this ?
Arjen Smits
@Danthar
@snekbaev What a common pattern is, is to have a well known actor in your system. Which acts as a manager for said ServiceActors. Which the others actors know about. If they want to get a reference to a specific serviceactor, they send a message to that manager, and the manager responds to that by sending them the proper service actor. Or telling them that service actor does not exists or something.
alternatively if you want to send a message to ALL service actors. you can use a grouprouter. Or actorselection wildcards.
chipdice
@chipdice
How do I receive notification when a ClusterSingleton has taken over?
Shukhrat Nekbaev
@snekbaev
@Danthar yeah, that's what I was thinking of, that eventually I will have to use ActorSelection to hit that ServiceActor (which I actually have which serves the purpose of being a parent actor for all service actors). Then will have to create messages per requested child service actor and respond with it. Requesters will probably have to use Become (Initializing, Initialized) and wait for the response, meanwhile stash... or use "Ask" in PreStart. Still not sure, feels like I could just directly use child service actors via "ActorPaths" approach, imho, about the same, but less boilerplate... still on the edge on this one...
AndreSteenbergen
@AndreSteenbergen
Just a mind game, maybe someone would like to brainstorm with me. @alexvaluyskiy created the kafka stream connector. I would like to create a coordinator based on topic (I guess, maybe other ideas) which could create actors based on message key (if not existing) and sending the actor the message. How could we make the created actors aware when an actor could actually raise events of itself, publishing to the topic. Should the actors be allowed to do something like that? Like the PersistentActors?
  • Assuming we start consuming from the beginning
  • I am thinking about fsm actors, reacting on the eof message from kafka (become synced)
  • produce the message, and wait for the event to come by, or just assume a still synced state
Creating a persistent set af actors based on kafka queues. As kafka is quite a performant commit log and kafka can already be used as an eventsource if you would like to
saepul ramdani
@msaepulr_twitter
Can we assign actor's property like Stash do (implement an interface, and then property will be assigned upon actor instance creation) ?
Bartosz Sypytkowski
@Horusiath
@AndreSteenbergen try set github issue for your ideas. I'd like to join the conversation, but I'm not always able to respond right away - also gitter is horrible for tracking particular threads
@msaepulr_twitter you can now, but this API will be removed soon
if was badly designed
saepul ramdani
@msaepulr_twitter
@Horusiath I see, thanks for the answer
Aaron Stannard
@Aaronontheweb
@Horusiath yeah, the stashing API is not being redesigned anytime soon dude
I think you mean the thing that picks which stash to use
but to answer your question @msaepulr_twitter - the stash gets set when we spawn actors because the choice of stash impacts which mailbox gets used too
so even though IStash is settable
saepul ramdani
@msaepulr_twitter
@Aaronontheweb my question is actually: can we modify or add some process when spawning actor? (e.g set their property)
AndreSteenbergen
@AndreSteenbergen
@Horusiath @alexvaluyskiy @bobanco @marcpiechura What are your thoughts on AkkaNetContrib/Alpakka#39
Ebere Abanonu
@eaba
@Aaronontheweb @Horusiath how can we better handle skipped ordering number in eventjournal?
I have had issues with this kinds:1,2,3,4,5,109,110
once this happens no Actor gets any messages except only if the tables are dropped and recreated
Bartosz Sypytkowski
@Horusiath
@eaba if something like this happens it's a faulty behavior
@AndreSteenbergen I'll try to respond after work
Ebere Abanonu
@eaba
@Horusiath sure, how can this faulty behavior be handled? It is a serious issue if that happens in production, users will wait endlessly for response
Shukhrat Nekbaev
@snekbaev
Question about RunTask(). Is it safe to assume that it is a wrapper for calling async operations from UntypedActor or regular Receive methods? I mean, will it block the thread internally until result is received? In comparison to async/await which will let the thread go
Shukhrat Nekbaev
@snekbaev
And are there downsides to using RunTask vs PipeTo, former seems to simplify the code a bit?
For example, is it safe to modify actor's internal state from the RunTask, i.e. possible race conditions:
private void HandleRunTask( RunTask runTask)
{
    _counter++;

    Console.WriteLine( "Before RunTask" );

    RunTask
        (
         async () =>
         {
             _counter++;

             Console.WriteLine( $"Counter: {_counter}" );

             Console.WriteLine( "Before await" );

             await Task.Delay( 5000 ).ConfigureAwait( false );

             Console.WriteLine( "After await" );
         } );

    Console.WriteLine( "After RunTask" );
}
Shukhrat Nekbaev
@snekbaev
at least sending two messages to the actor above seem to indicate that they are still processed sequentially
Vasily Kirichenko
@vasily-kirichenko
@snekbaev
public static void RunTask(Func<Task> asyncAction)
        {
            var context = ActorCell.Current;

            if (context == null)
                throw new InvalidOperationException("RunTask must be called from an actor context.");

            var dispatcher = context.Dispatcher;

            //suspend the mailbox
            dispatcher.Suspend(context);

            ActorTaskScheduler actorScheduler = context.TaskScheduler;
            actorScheduler.CurrentMessage = context.CurrentMessage;

            Task<Task>.Factory.StartNew(asyncAction, CancellationToken.None, TaskCreationOptions.None, actorScheduler)
                              .Unwrap()
                              .ContinueWith(parent =>
                              {
                                  Exception exception = GetTaskException(parent);

                                  if (exception == null)
                                  {
                                      dispatcher.Resume(context);

                                      context.CheckReceiveTimeout();
                                  }
                                  else
                                  {
                                      context.Self.AsInstanceOf<IInternalActorRef>().SendSystemMessage(new ActorTaskSchedulerMessage(exception, actorScheduler.CurrentMessage));
                                  }
                                  //clear the current message field of the scheduler
                                  actorScheduler.CurrentMessage = null;
                              }, actorScheduler);
        }
so the lambda (task) is executed on the actor scheduler
I think it's safe to modify the actor state.
public override int MaximumConcurrencyLevel
        {
            get { return 1; }
        }
Shukhrat Nekbaev
@snekbaev
@vasily-kirichenko thank you!
Shukhrat Nekbaev
@snekbaev
Question about ConsistentHashingPool. Eventual goal is to have one actor per IConsistentHashable, no reuse. It seems that ConsistentHashingPool requires instance count, which means it will be reused, which in turn means I can't have state in the actor assuming it is per, say UserId. In that case I think I'm better off by manually creating actors for each UserId and store those in Dictionary or so. Or is it possible to actually tell ConsisntentHashingPool to have one actor per UserId? I did see some Resizer option, wasn't sure it was that?
Shukhrat Nekbaev
@snekbaev

A follow up would be:

public sealed class CHRChildActor : UntypedActor
{
    private int? processedUserId;
    private static int instanceCounter = 0;

    protected override void PreStart()
    {
        Console.WriteLine( Self.Path );

        base.PreStart();

        instanceCounter++;
    }

    protected override void OnReceive( object message )
    {
        message.Match().With<Work>( HandleWork )
               .Default( Unhandled );
    }

    private void HandleWork(Work work)
    {
        Console.WriteLine( $"Work received for userId {work.UserId}" );
        Console.WriteLine( $"Instance counter: {instanceCounter}" );

        if (processedUserId == null)
        {
            processedUserId = work.UserId;
        }

        if (processedUserId != work.UserId)
        {
            throw new Exception( "Being reused!" );
        }
    }

    public sealed class Work : IConsistentHashable
    {
        public Work(int userId)
        {
            UserId = userId;
        }

        public int UserId { get; }
        public object ConsistentHashKey => UserId;
    }
}

basically it throws exception if being reused, now, parent sends 5 messages

for (var i = 0; i < 5; i++)
{
    _child.Tell( new CHRChildActor.Work( i ) );
}

and the way _child actor is created:

_child = Context.ActorOf( Props.Create( () => new CHRChildActor()).WithRouter( new ConsistentHashingPool( 10 ) ) );
Vasily Kirichenko
@vasily-kirichenko
what is instanceCounter value?
Ismael Hamed
@ismaelhamed
@snekbaev, AFAIK consistent hashing will give you a range of UserIds per hash
Shukhrat Nekbaev
@snekbaev

so, size is 10, but 5 messages are sent. I was expecting it not to collide, i.e. throw exception, but it does, then I tried also setting the name:

_child = Context.ActorOf( Props.Create( () => new CHRChildActor()).WithRouter( new ConsistentHashingPool( 10 ) ), "a" );

ran and got the exception, then I added one more letter to the name:

_child = Context.ActorOf( Props.Create( () => new CHRChildActor()).WithRouter( new ConsistentHashingPool( 10 ) ), "ab" );

Ran and... got no exception. I was expecting one, but got none. Rerun 10 times, no exception. If I change to "a" or remove the name - exception is there, if I have "ab" - no exception. I'm afraid to ask, but is there some weirdness going on in here or am I missing something?

Vasily Kirichenko
@vasily-kirichenko
turn on debug logs and look at what actors and when are created
Shukhrat Nekbaev
@snekbaev
@vasily-kirichenko instanceCounter is a poor man's way to count PreStart. Basically if actor is not restarted I know how many instances are there :)
Vasily Kirichenko
@vasily-kirichenko
I meant what is its value :)
Shukhrat Nekbaev
@snekbaev
the child actor above, it has Console.Log, I can see the names of actors when they are created, order of console output is out of order though
normal run:
akka://Test/user/$a/ab/$c
akka://Test/user/$a/ab/$b
akka://Test/user/$a/ab/$h
akka://Test/user/$a/ab/$i
akka://Test/user/$a/ab/$j
akka://Test/user/$a/ab/$f
akka://Test/user/$a/ab/$g
CHRParentActor - PreStart
akka://Test/user/$a/ab/$e
akka://Test/user/$a/ab/$a
akka://Test/user/$a/ab/$d
Work received for userId 0
Instance counter: 10
Work received for userId 3
Instance counter: 10
Work received for userId 1
Instance counter: 10
Work received for userId 2
Instance counter: 10
Work received for userId 4
Instance counter: 10
crash:
akka://Test/user/$a/a/$a
akka://Test/user/$a/a/$g
akka://Test/user/$a/a/$i
akka://Test/user/$a/a/$h
akka://Test/user/$a/a/$j
akka://Test/user/$a/a/$e
akka://Test/user/$a/a/$c
akka://Test/user/$a/a/$b
akka://Test/user/$a/a/$f
akka://Test/user/$a/a/$d
CHRParentActor - PreStart
Work received for userId 0
Instance counter: 10
Work received for userId 2
Instance counter: 10
Work received for userId 1
Instance counter: 10
Work received for userId 3
Instance counter: 10
Work received for userId 4
Instance counter: 10
akka://Test/user/$a/a/$e
akka://Test/user/$a/a/$d
akka://Test/user/$a/a/$a
akka://Test/user/$a/a/$h
akka://Test/user/$a/a/$i
akka://Test/user/$a/a/$b
akka://Test/user/$a/a/$c
akka://Test/user/$a/a/$f
akka://Test/user/$a/a/$j
akka://Test/user/$a/a/$g
[ERROR][23.7.2018 11:27:27][Thread 0005][akka://Test/user/$a/a] Being reused!
Cause: System.Exception: Being reused!
   at TestActors.ConsistentHashRouting.CHRChildActor.HandleWork(Work work) in c:\Workspace\c#\akka.net\TestActors\ConsistentHashRouting\CHRChildActor.cs:line 40
   at Akka.Case.With[TMessage](Action`1 action)
   at TestActors.ConsistentHashRouting.CHRChildActor.OnReceive(Object message) in 
REDACTED
Bartosz Sypytkowski
@Horusiath
@snekbaev you can always Context.Watch over your actors - if they are killed (i.e. as a result of not being albe to replay they journal) they will notify watchers with Terminated message. Btw. if this happens in production, it means that your event journal has been corrupted and can no longer serve as a reliable source of data - in that case waiting for the response is the least of your problem.
Shukhrat Nekbaev
@snekbaev
this is a test app I'm using to learn Akka :) and to test things faster :)

btw, I ran into some quirks with:

 message.Match().With<Work>( HandleCrash )
                   .Default( Unhandled );

the pattern matching API, I'd assume it is not meant to be used by end users? Yesterday, I was "watch"ing and waiting for "Terminated" message, but never got it, though was getting unhandled messages, when checked, it was some Akka.Case which wrapped the "Terminated" message. Because of that I reworked the message handler to switch

Shukhrat Nekbaev
@snekbaev
message.Match().With<SEChildChildActor.Crash>( HandleCrash )
       .Match().With<Terminated>(HandleTerminated)
       .Default( o =>
                 {
                     Console.WriteLine( o );
                     Unhandled(o);
                 } );
became:
switch (message)
{
    case SEChildChildActor.Crash crash:
        HandleCrash( crash );
        break;

    case Terminated terminated:
        HandleTerminated(terminated);
        break;

    default:
        Unhandled( message );
        break;
}