Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • 08:57
    michelvosje opened #6382
  • 07:55
    beachwalker commented #6378
  • 07:39
    beachwalker synchronize #6378
  • Feb 02 21:27
    Arkatufus synchronize #172
  • Feb 02 21:09
    CumpsD commented #172
  • Feb 02 17:01
    dependabot[bot] synchronize #6285
  • Feb 02 17:01
    dependabot[bot] synchronize #6240
  • Feb 02 17:01
    dependabot[bot] synchronize #6241
  • Feb 02 17:01

    dependabot[bot] on nuget

    Bump MicrosoftLibVersion from 5… (compare)

  • Feb 02 17:01
    dependabot[bot] synchronize #6237
  • Feb 02 17:01
    dependabot[bot] synchronize #6351
  • Feb 02 17:01

    dependabot[bot] on nuget

    Bump System.Configuration.Confi… (compare)

  • Feb 02 17:01

    dependabot[bot] on nuget

    Bump Microsoft.Extensions.Hosti… (compare)

  • Feb 02 17:01

    dependabot[bot] on nuget

    Bump Microsoft.Extensions.Hosti… (compare)

  • Feb 02 17:01

    dependabot[bot] on nuget

    Bump BenchmarkDotNet from 0.13.… (compare)

  • Feb 02 17:01
    dependabot[bot] edited #6285
  • Feb 02 17:01
    dependabot[bot] edited #6241
  • Feb 02 17:01
    dependabot[bot] edited #6240
  • Feb 02 17:01
    dependabot[bot] edited #6237
  • Feb 02 17:01
    dependabot[bot] edited #6351
Bartosz Sypytkowski
@Horusiath
@bruno-alencar if Receive handler is of type Action<T>, so yes you can make it async (just like any method returning void), but it won't be tread safe. Basically your actor will start processing a message and then immediatelly will pick the next one without waiting for previous to finish first. Essentially you'll end up with race conditions.
Vagif Abilov
@object
@matneyx I believe @rustamm is right. When people ask you to solve an interview puzzle, you shouldn't just apply your favourite technology but first try to guess what they want to get from you. The ability to solve the task using minimal set of tools is a quality that is always appreciated.
matneyx
@matneyx
@rustamm @object -- I definitely considered that, but I read a few posts that said FileSystemWatcher occasionally missed updates and polling was a better option. There was a third option that ran both options, using polling to catch any that the FileSystemWatcher didn't catch, but I didn't give myself the time to implement that. ¯_(ツ)_/¯
Shukhrat Nekbaev
@snekbaev

Hi, question about actor discovery for local actor system (no remoting).

Common service actors are located in a single ServicesActor:
/user/ServicesActor/Service1
/user/ServicesActor/Service2
/user/ServicesActor/ServiceN

Some logic per instance in a single SomeManager:

/user/SomeManager/Instance1/SomeOrchestrator/Worker1
/user/SomeManager/Instance1/SomeOrchestrator/Worker2
/user/SomeManager/Instance1/SomeOrchestrator/WorkerN
...
/user/SomeManager/InstanceN/SomeOrchestrator/Worker1..N

I need to send messages from "WorkerX" actors to Service1... ServiceN actors. What is the best way to resolve service actors from worker actors?
One solution is to use "ActorPaths" class recommendation, but in this case I'm ending up with ActorSelection in-game, which is not recommended (https://getakka.net/articles/actors/untyped-actor-api.html) for local actor system, though definitely would simplify the discovery. Second option: pass ServiceActor IActorRef to SomeManager, send a message to ServiceActor something like "RequestAllServiceRefs" which will reply with a DTO that contains all actorrefs for all services, then pass that reply into constructor of "InstanceX" actor which in turn will pass it into constructor of "SomeOrchestrator" actor which will pass it to all child "WorkerX" actors. Quite verbose and complicates the initialization of the SomeManager (can't create child actors in PreStart, because reply is not received yet).

What is the best way to achieve this?

Arjen Smits
@Danthar
@snekbaev you could do a couple of things. Passing around the entire list of available service actors is not recommended. Because how will you detect if that list has changed?
You could have the workeractor communicate with the servicesactor to aquire the IActorRef of the Service actor you want. ActorSelection is also an option. It is not recommended as your default way of doing things. Because its slower then directly working with IActorRefs.
But the tradeoff is yours to make :)
Shukhrat Nekbaev
@snekbaev
@Danthar list is a simple DTO in that case which will contain all the services that exist, probably simpler than having one per service:
public sealed class ServiceList
{
ctor(Service1..... ServiceN)
public IActorRef Service1 {get;}
public IActorRef Service2 {get;}
public IActorRef ServiceN {get;}
}
--You could have the workeractor communicate with the servicesactor to aquire the IActorRef of the Service actor you want
in that case we are back to the ActorSelection which I was trying to avoid, but at the same time given that my case is not so complicated I started wondering how to properly pass those references (the recommendations of from the aforementioned link). Because it didn't look very straightforward/clean I thought maybe there's something about my design, however, it doesn't seem to be way off from examples I've seen. Eventually, it seems I will have to cook something based on ActorSelection :)
Ian Patterson
@IanPattersonMuo
Has anyone any experience of using Protocol Buffers for Sharding ? and if so how did they structure their Envelope and MessageExtractor code ? My first pass attempt is to create an Envelope that takes Google.Protobuf.IMessage and then packs them into an Any field. This makes the MessageExtractor more tricky as the type has to be unpacked before being returned. Is there a supported way of doing this ?
Arjen Smits
@Danthar
@snekbaev What a common pattern is, is to have a well known actor in your system. Which acts as a manager for said ServiceActors. Which the others actors know about. If they want to get a reference to a specific serviceactor, they send a message to that manager, and the manager responds to that by sending them the proper service actor. Or telling them that service actor does not exists or something.
alternatively if you want to send a message to ALL service actors. you can use a grouprouter. Or actorselection wildcards.
chipdice
@chipdice
How do I receive notification when a ClusterSingleton has taken over?
Shukhrat Nekbaev
@snekbaev
@Danthar yeah, that's what I was thinking of, that eventually I will have to use ActorSelection to hit that ServiceActor (which I actually have which serves the purpose of being a parent actor for all service actors). Then will have to create messages per requested child service actor and respond with it. Requesters will probably have to use Become (Initializing, Initialized) and wait for the response, meanwhile stash... or use "Ask" in PreStart. Still not sure, feels like I could just directly use child service actors via "ActorPaths" approach, imho, about the same, but less boilerplate... still on the edge on this one...
AndreSteenbergen
@AndreSteenbergen
Just a mind game, maybe someone would like to brainstorm with me. @alexvaluyskiy created the kafka stream connector. I would like to create a coordinator based on topic (I guess, maybe other ideas) which could create actors based on message key (if not existing) and sending the actor the message. How could we make the created actors aware when an actor could actually raise events of itself, publishing to the topic. Should the actors be allowed to do something like that? Like the PersistentActors?
  • Assuming we start consuming from the beginning
  • I am thinking about fsm actors, reacting on the eof message from kafka (become synced)
  • produce the message, and wait for the event to come by, or just assume a still synced state
Creating a persistent set af actors based on kafka queues. As kafka is quite a performant commit log and kafka can already be used as an eventsource if you would like to
saepul ramdani
@msaepulr_twitter
Can we assign actor's property like Stash do (implement an interface, and then property will be assigned upon actor instance creation) ?
Bartosz Sypytkowski
@Horusiath
@AndreSteenbergen try set github issue for your ideas. I'd like to join the conversation, but I'm not always able to respond right away - also gitter is horrible for tracking particular threads
@msaepulr_twitter you can now, but this API will be removed soon
if was badly designed
saepul ramdani
@msaepulr_twitter
@Horusiath I see, thanks for the answer
Aaron Stannard
@Aaronontheweb
@Horusiath yeah, the stashing API is not being redesigned anytime soon dude
I think you mean the thing that picks which stash to use
but to answer your question @msaepulr_twitter - the stash gets set when we spawn actors because the choice of stash impacts which mailbox gets used too
so even though IStash is settable
saepul ramdani
@msaepulr_twitter
@Aaronontheweb my question is actually: can we modify or add some process when spawning actor? (e.g set their property)
AndreSteenbergen
@AndreSteenbergen
@Horusiath @alexvaluyskiy @bobanco @marcpiechura What are your thoughts on AkkaNetContrib/Alpakka#39
Ebere Abanonu
@eaba
@Aaronontheweb @Horusiath how can we better handle skipped ordering number in eventjournal?
I have had issues with this kinds:1,2,3,4,5,109,110
once this happens no Actor gets any messages except only if the tables are dropped and recreated
Bartosz Sypytkowski
@Horusiath
@eaba if something like this happens it's a faulty behavior
@AndreSteenbergen I'll try to respond after work
Ebere Abanonu
@eaba
@Horusiath sure, how can this faulty behavior be handled? It is a serious issue if that happens in production, users will wait endlessly for response
Shukhrat Nekbaev
@snekbaev
Question about RunTask(). Is it safe to assume that it is a wrapper for calling async operations from UntypedActor or regular Receive methods? I mean, will it block the thread internally until result is received? In comparison to async/await which will let the thread go
Shukhrat Nekbaev
@snekbaev
And are there downsides to using RunTask vs PipeTo, former seems to simplify the code a bit?
For example, is it safe to modify actor's internal state from the RunTask, i.e. possible race conditions:
private void HandleRunTask( RunTask runTask)
{
    _counter++;

    Console.WriteLine( "Before RunTask" );

    RunTask
        (
         async () =>
         {
             _counter++;

             Console.WriteLine( $"Counter: {_counter}" );

             Console.WriteLine( "Before await" );

             await Task.Delay( 5000 ).ConfigureAwait( false );

             Console.WriteLine( "After await" );
         } );

    Console.WriteLine( "After RunTask" );
}
Shukhrat Nekbaev
@snekbaev
at least sending two messages to the actor above seem to indicate that they are still processed sequentially
Vasily Kirichenko
@vasily-kirichenko
@snekbaev
public static void RunTask(Func<Task> asyncAction)
        {
            var context = ActorCell.Current;

            if (context == null)
                throw new InvalidOperationException("RunTask must be called from an actor context.");

            var dispatcher = context.Dispatcher;

            //suspend the mailbox
            dispatcher.Suspend(context);

            ActorTaskScheduler actorScheduler = context.TaskScheduler;
            actorScheduler.CurrentMessage = context.CurrentMessage;

            Task<Task>.Factory.StartNew(asyncAction, CancellationToken.None, TaskCreationOptions.None, actorScheduler)
                              .Unwrap()
                              .ContinueWith(parent =>
                              {
                                  Exception exception = GetTaskException(parent);

                                  if (exception == null)
                                  {
                                      dispatcher.Resume(context);

                                      context.CheckReceiveTimeout();
                                  }
                                  else
                                  {
                                      context.Self.AsInstanceOf<IInternalActorRef>().SendSystemMessage(new ActorTaskSchedulerMessage(exception, actorScheduler.CurrentMessage));
                                  }
                                  //clear the current message field of the scheduler
                                  actorScheduler.CurrentMessage = null;
                              }, actorScheduler);
        }
so the lambda (task) is executed on the actor scheduler
I think it's safe to modify the actor state.
public override int MaximumConcurrencyLevel
        {
            get { return 1; }
        }
Shukhrat Nekbaev
@snekbaev
@vasily-kirichenko thank you!
Shukhrat Nekbaev
@snekbaev
Question about ConsistentHashingPool. Eventual goal is to have one actor per IConsistentHashable, no reuse. It seems that ConsistentHashingPool requires instance count, which means it will be reused, which in turn means I can't have state in the actor assuming it is per, say UserId. In that case I think I'm better off by manually creating actors for each UserId and store those in Dictionary or so. Or is it possible to actually tell ConsisntentHashingPool to have one actor per UserId? I did see some Resizer option, wasn't sure it was that?
Shukhrat Nekbaev
@snekbaev

A follow up would be:

public sealed class CHRChildActor : UntypedActor
{
    private int? processedUserId;
    private static int instanceCounter = 0;

    protected override void PreStart()
    {
        Console.WriteLine( Self.Path );

        base.PreStart();

        instanceCounter++;
    }

    protected override void OnReceive( object message )
    {
        message.Match().With<Work>( HandleWork )
               .Default( Unhandled );
    }

    private void HandleWork(Work work)
    {
        Console.WriteLine( $"Work received for userId {work.UserId}" );
        Console.WriteLine( $"Instance counter: {instanceCounter}" );

        if (processedUserId == null)
        {
            processedUserId = work.UserId;
        }

        if (processedUserId != work.UserId)
        {
            throw new Exception( "Being reused!" );
        }
    }

    public sealed class Work : IConsistentHashable
    {
        public Work(int userId)
        {
            UserId = userId;
        }

        public int UserId { get; }
        public object ConsistentHashKey => UserId;
    }
}

basically it throws exception if being reused, now, parent sends 5 messages

for (var i = 0; i < 5; i++)
{
    _child.Tell( new CHRChildActor.Work( i ) );
}

and the way _child actor is created:

_child = Context.ActorOf( Props.Create( () => new CHRChildActor()).WithRouter( new ConsistentHashingPool( 10 ) ) );
Vasily Kirichenko
@vasily-kirichenko
what is instanceCounter value?
Ismael Hamed
@ismaelhamed
@snekbaev, AFAIK consistent hashing will give you a range of UserIds per hash
Shukhrat Nekbaev
@snekbaev

so, size is 10, but 5 messages are sent. I was expecting it not to collide, i.e. throw exception, but it does, then I tried also setting the name:

_child = Context.ActorOf( Props.Create( () => new CHRChildActor()).WithRouter( new ConsistentHashingPool( 10 ) ), "a" );

ran and got the exception, then I added one more letter to the name:

_child = Context.ActorOf( Props.Create( () => new CHRChildActor()).WithRouter( new ConsistentHashingPool( 10 ) ), "ab" );

Ran and... got no exception. I was expecting one, but got none. Rerun 10 times, no exception. If I change to "a" or remove the name - exception is there, if I have "ab" - no exception. I'm afraid to ask, but is there some weirdness going on in here or am I missing something?

Vasily Kirichenko
@vasily-kirichenko
turn on debug logs and look at what actors and when are created
Shukhrat Nekbaev
@snekbaev
@vasily-kirichenko instanceCounter is a poor man's way to count PreStart. Basically if actor is not restarted I know how many instances are there :)
Vasily Kirichenko
@vasily-kirichenko
I meant what is its value :)