by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • May 28 19:06
    snekbaev opened #4446
  • May 28 10:43
    Zetanova commented #4434
  • May 28 06:49

    dependabot-preview[bot] on nuget

    Bump Google.Protobuf from 3.12.… (compare)

  • May 28 06:49
    dependabot-preview[bot] labeled #4445
  • May 28 06:49
    dependabot-preview[bot] opened #4445
  • May 28 00:03
    Aaronontheweb commented #4434
  • May 27 23:41
    Zetanova commented #4434
  • May 27 22:57
    Zetanova commented #4434
  • May 27 22:34
    Aaronontheweb commented #4419
  • May 27 15:54
    Ralf1108 commented #4432
  • May 27 14:42
    Ralf1108 commented #4419
  • May 27 09:41
    Zetanova commented #4434
  • May 27 06:38
    dependabot-preview[bot] labeled #141
  • May 27 06:38
    dependabot-preview[bot] opened #141
  • May 27 06:38

    dependabot-preview[bot] on nuget

    Bump AkkaVersion from 1.4.6 to … (compare)

  • May 27 06:33

    dependabot-preview[bot] on nuget

    (compare)

  • May 27 06:33
    dependabot-preview[bot] closed #156
  • May 27 06:33
    dependabot-preview[bot] commented #156
  • May 27 06:33
    dependabot-preview[bot] labeled #157
  • May 27 06:33
    dependabot-preview[bot] opened #157
Zetanova
@Zetanova
@evaldast yes its an architecture issue. You can optionaly switch to akka.streams but dont do it if u currently are low on time.
Simple approach would be to just aggregate the actions in Actor1 directly. An other approach would be to also change the behavior of Actor2 after the treshhold tiggered, so Actor2 sends the rest of the user-actions message back to Actor1 or directly to Actor3
evaldast
@evaldast
Thanks for the reply, yes, I was currently using the second approach you mentioned, wondered if there was a better way to go around this
Zetanova
@Zetanova
and second approach has a side effect, you will loose the message ordering. If the aggregate process need a ordered event stream (user-login before logout) then better to make a simple class (no-actor) that handles the event-message and Actor1 creates an instance of it and processes the user-messages with it
userActionsAggregate.Handle(msg);  
if(userActionsAggregate.IsTriggered) {  Become(Triggered); }
evaldast
@evaldast
No, the aggregate does not care about message order, it aggregates the values of events received and once the sum of a value is hit it moves to next step
I would not mind moving with approach 1 where aggregating is done in the same actor which checks for current status, however, there is quite a bit of heavy lifting involved into aggregation process which I would not like to have in a single actor
Zetanova
@Zetanova
How to run Akka.Cluster.Tests.MultiNode test localy? For me every test gets skipped
Inside VS2017
Aaron Stannard
@Aaronontheweb
@Zetanova I need to finish akkadotnet/akka.net#3901
but to run the build in-ones locally
.build.cmd multinodetests
Alex Hofer
@alexhofer

I have two questions, I have a callback for a FIlewatcher that I want to have send a message to Self which would then send a message to a Child. However I am running into an exception about there being no Actor Context because of async await. I was following the File Observer Actor example on GitHub.

Anywho onto the questions, first is there any way to preserve Self in this situation? I tried just putting it in a property on the Actor.

Second, is it frowned upon to store Context in a property so I can use it to create a child in this situation? This is what I ended up doing, so I wanted to make sure it wouldn't cause any terrible issues.

Example of what was broken:

private void FileWatcherOnCreated(object sender, FileSystemEventArgs e)
{
    ProcessFile(e.FullPath);
}

private void ProcessFile(string filePath)
{
    var message = new MyMessage(filePath);
    _self.Tell(message);
}
This is what I ended up with:
private void ProcessFile(string filePath)
{
    var message = new MyMessage(filePath);

    var props = context.DI().Props<MyChildActor>();

    //_context here is set in the constructor of the Actor as a readonly property.
    var child = _context.ActorOf(props, someName);

    child.Tell(message, ActorRefs.NoSender);
}
This seems to work fine
Zetanova
@Zetanova
@Aaronontheweb Is it possible to debug stop / step into with multinode?
Zetanova
@Zetanova
@alexhofer FileWatcherOnCreated Handler is could from outside of ActorSystem to u need to save the IActorRef in a member or scope or state variable.
Context.Self and ActorBase.Self are getting the instance from a ThreadStatic field.
var self = Self; //scope variable
fileWatcher.OnCreated += (sender, e) => { self.Tell(new MyMessage(e.FullPath); };
IActorRef are thread safe and safe to call from everywhere (inside and external code) but properties from ActorContext can only be accessed from the current-thread of the actor process
Alex Hofer
@alexhofer
Hmm even there, inside self it is throwing because it can't access Context.
Aaron Stannard
@Aaronontheweb
@Zetanova if you're launching it from Visual Studio it should be possible - just need to pass in the right commandline arguments
Alex Hofer
@alexhofer
 var context = Context;

_watcher.Created += (sender, e) =>
{

    var message = new MyMessage(filePath);

    var props = context.DI().Props<MyChildActor>();

    var child = context.ActorOf(props, someName);

    child.Tell(message, ActorRefs.NoSender);
};
This worked find though. Is this sorta thing ok to do @Aaronontheweb ? Context is thread safe too right?
Aaron Stannard
@Aaronontheweb
@alexhofer context is technically mutable
especially doing things like calling context.ActorOf
so this could, in a busy system, create some problems
Context, the property, throws because it's not meant to be accessed outside the execution context of an actor
Alex Hofer
@alexhofer
Ah, dang. Do you have any tips for somehow getting a message back to Self from within a Callback?
Aaron Stannard
@Aaronontheweb
and we park the Context inside a ThreadStatic variable
closing over self is perfectly fine
I do that all the time when I'm doing TPL stuff
the way I'd rearchitect that code you included is to have the event handler send a message to self
which, in turn, causes self to create a child and forward the message
etc
does that make sense?
Alex Hofer
@alexhofer
Hmm, its throwing because there's no Actor Context inside there. This is inside a behavior if that matters.
Aaron Stannard
@Aaronontheweb
var self = Self
 var self = Self;

_watcher.Created += (sender, e) =>
{

    var message = new MyMessage(filePath);

    self.Tell(message, ActorRefs.NoSender);
};
does that throw?
because Self is just Context.Self
Alex Hofer
@alexhofer
So, at the point I am trying to do var self = Self Self is showing up as a NotSupportedException with this:
System.NotSupportedException: There is no active ActorContext, this is most likely due to use of async operations from within this actor.
   at Akka.Actor.ActorBase.get_Context()
   at Akka.Actor.ActorBase.get_Self()
Which is weird, because self shows up as the right actor. But when it never receives the message it tells itself.
Alex Hofer
@alexhofer
Maybe it just doesn't work in a unit test scenario?
Aaron Stannard
@Aaronontheweb
if the self = Self happening outside of the actor?
Alex Hofer
@alexhofer

Nope, this is basically what is running, minus some logging:

        private void Active()
        {
            Receive<MyMessage>(message =>
            {
                TellNewChild(message);
            });

            _watcher = new FileSystemWatcher("directory", "*.csv")
            {
                NotifyFilter = NotifyFilters.LastAccess
                               | NotifyFilters.LastWrite
                               | NotifyFilters.FileName
                               | NotifyFilters.DirectoryName,

                EnableRaisingEvents = true
            };

            var self = Self;

            _watcher.Created += (sender, e) =>
            {
                var message = new MyMessage(e.FullPath;
                self.Tell(message, ActorRefs.NoSender);
            };
        }

But it never hits the MyMessage receive handler. It is in this Active behavior.

Wellp, @Aaronontheweb it looks to be working now. I guess I just needed to...clean the solution and rebuild. D:
Aaron Stannard
@Aaronontheweb
:+1:
glad I could help
Alex Hofer
@alexhofer
Thanks @Aaronontheweb ! :)
Greatsamps
@Greatsamps

Have a question about killing actors. Is there a way to kill an actor, including the current message its processing? From looking at the docs, it appears that both the shutdown and kill functions send a message to the actor that would get processed after the current message.

we have a situation where sometimes some code within the actor can hang and block, so would like to be able to restart the actor from above. Or do we need to add timeouts etc to the actors message processing code?

Bartosz Sypytkowski
@Horusiath

@Greatsamps as you've probably noticed, an actor's Receive method is just a block of synchronous code. Once it started executing, it will run until completion. There's not much we can do here, as only operating system can interrupt code running on a given thread, and OS itself is not aware of existence of actors.

Usually the reason behind having long running blocking code is either trying to do synchronous I/O (which you should never do, pretty much every .NET API now exposes Async API for that) or long running loop. For the latter case one way is to split long running process into sequence of steps or batches and send them as operations to Self, eg. instead of having:

Receive<Msg>(msg => {
    var sum = 0;
    for (var i = 0; i < msg.Count; i++) {
        sum += DoSomething(i);
    }
    Sender.Tell(sum);
});

You can model it as:

Receive<Msg>(msg => {
    var sum = msg.State;
    // process message in batches of 1000 elements
    var until = Math.Min(msg.From + 1000, msg.Count);
    var i = msg.From;
    for (; i < until; i++) {
        sum += DoSomething(i);
    }
    if (i < msg.Count) {
        Self.Forward(new Msg(state: sum, from: i, count: msg.Count));
    } else {
        Sender.Tell(sum);
    }
});
This way a potentially long running sequence can be split into shorter steps, which can be interleaved with other messages, including stop signals.
mijoki
@mijoki

I've encountered this error whilst using Akka.Persistence.MongoDb and Akka.Cluster.Sharding

Exception in ReceiveRecover when replaying event type ["Akka.Cluster.Sharding.PersistentShardCoordinator+ShardRegionRegistered"] with sequence number [1] for persistenceId ["/system/sharding/TestCoordinator/singleton/coordinator"]
System.ArgumentNullException: Value cannot be null.
Parameter name: key
at System.Collections.Immutable.Requires.FailArgumentNullException(String parameterName)
at System.Collections.Immutable.ImmutableDictionary`2.ContainsKey(TKey key)
at Akka.Cluster.Sharding.PersistentShardCoordinator.State.Updated(IDomainEvent e)
at Akka.Cluster.Sharding.PersistentShardCoordinator.ReceiveRecover(Object message)
at Akka.Actor.ActorBase.AroundReceive(Receive receive, Object message)
at Akka.Persistence.Eventsourced.<>cDisplayClass92_0.<Recovering>b1(Receive receive, Object message)

This will only occur when state-store-mode is set to persistence and after the cluster has been created once. It seems that the ShardRegionRegistered object is unable to be serialized correctly (See below), presumably due to the fact that it contains an IActorRef.
This does not happen when state-store-mode is set to ddata, however I would ideally prefer to use remember-entities which I cannot get to work with ddata.
{
"_id" : "/system/sharding/TestCoordinator/singleton/coordinator_4",
"PersistenceId" : "/system/sharding/TestCoordinator/singleton/coordinator",
"SequenceNr" : NumberLong(4),
"IsDeleted" : false,
"Payload" : {
"_t" : "Akka.Cluster.Sharding.PersistentShardCoordinator+ShardRegionRegistered, Akka.Cluster.Sharding"
},
"Manifest" : "Akka.Persistence.Persistent, Akka.Persistence",
"Ordering" : Timestamp(1568893010, 1),
"Tags" : null,
"SerializerId" : null
}

Zetanova
@Zetanova

@Greatsamps dotnet has a feature Thread.Abort(), but its a rabbit hole and i dont know if its still supported in dotnet core. If there is really a situation where it a Thread.Abort seams required it is near always to restart the whole process else you can get so many bad side effects. If you realy need to abort some process and dont have any other option like CancellationToken or a timeout paramter, then u can just create a deticated Thread/Task inside the actor and Abort it from the actor thread there.

                Thread taskThread = null;
                var task = Task.Run(() => {
                    taskThread = Thread.CurrentThread;
                    Thread.Sleep(Timeout.Infinite);
                });

                Thread.Sleep(2000); //wait to start the new thread

                taskThread.Abort();

But it should realy only be used for a 3th libery API where there is no other option to cancel the process. to make a GC.Collect() after the abort is a good idea too.