Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • 15:42
    Aaronontheweb synchronize #4086
  • 15:42
    Aaronontheweb closed #4083
  • 15:42

    Aaronontheweb on dev

    Fix #4083 - Endpoint receive bu… (compare)

  • 15:42
    Aaronontheweb closed #4089
  • 15:42
    Aaronontheweb labeled #4093
  • 15:42
    Aaronontheweb labeled #4093
  • 15:42
    Aaronontheweb labeled #4093
  • 15:42
    Aaronontheweb opened #4093
  • 14:20
    Aaronontheweb commented #4092
  • 14:14
    Aaronontheweb labeled #4089
  • 14:14
    Aaronontheweb labeled #4089
  • 14:11
    Aaronontheweb synchronize #4089
  • 14:10
    Aaronontheweb synchronize #4086
  • 14:09

    Aaronontheweb on dev

    Convert to ImmutableHashSet for… (compare)

  • 14:09
    Aaronontheweb closed #4090
  • 12:04
    nagytech synchronize #4092
  • 11:53
    nagytech synchronize #4092
  • 11:49
    nagytech edited #4092
  • 11:40
    nagytech opened #4092
  • 11:32
    nagytech edited #4091
lujunjie
@LosCaesar_gitlab
I want to know how to implement some actors to form a ring and make multiple rounds of messaging?Thanks
Vasily Kirichenko
@vasily-kirichenko
@Horusiath I need to "ask" an actor outside actor system (it's a call from Fable.Remoting). I have only IActorRef<'a> on which I can call <?, but it's untyped. You suggest to pass an IActorRef<> to receive response, but where can I get one?
Bartosz Sypytkowski
@Horusiath
@vasily-kirichenko this is what ask overload I give you a link to is all about (instead of message it takes message factory which takes temporary callback address as a param). But it's not exposed directly in Akkling yet
Vasily Kirichenko
@vasily-kirichenko
@Horusiath thanks. We need to add it to Akkling.
Stijn Herreman
@stijnherreman
When actor A sends a message to actor B, and B is supposed to reply to A with some data, how do I inform A of a failure in one of B's children? This cannot be done with a supervisor strategy it seems? The goal is to let A know it should stop its process and shut down, logging a failure somewhere.
Stijn Herreman
@stijnherreman
Maybe I need to clarify a bit. B is a supervisor that Forwards messages to children. Failure in children is escalated to B, so I'm looking for a way to have B Tell A about the failure.
Vasily Kirichenko
@vasily-kirichenko
@stijnherreman You can watch an actor.
Stijn Herreman
@stijnherreman
@vasily-kirichenko Thanks, I'll take a look
Vagif Abilov
@object
I wonder what would be the best way to investigate timeout problems with SqlServer adapter. Recovery of persistent actors in our system sometimes fail with timeout exception. When we traced low-level db activities we found that sometimes INSERT into EventJournal doesn't return for a long time (example: 42 seconds), and it blocks SELECT from the same PersistentId. Such long time on INSERT doesn't make sense, but we are using BatchingJournal. Can this be part of the problem? /cc @Horusiath
Vasily Kirichenko
@vasily-kirichenko
select (nolock)?
Vagif Abilov
@object
We don't do these SELECTs.
It's all Akka EventJournal adapter
this one? :)
Vagif Abilov
@object
Yes. Temporarily switched from Batching to non-batching journal. No difference
Bartosz Sypytkowski
@Horusiath
@object you can relax isolation level of your transactions using isolation-level config
there are also max-batch-size, max-buffer-size and max-concurrent-operations
Bartosz Sypytkowski
@Horusiath
it works like that: batching journal buffers incoming write requests (the max size of that buffer is max-buffer-size=500000, after that size is reached any more writes will be denied until buffer will be freed). These writes are picked into batches (up to max-batch-size=100) and every batch is essentially a single db request - keep in mind that if you store mutliple events in a single Persist request, they all count as 1 atomic write.
last, since db requests are asynchronous by default, we don't call them one by one. Instead we execute many of them up to a given concurrency limit (max-concurrent-operations=64). keep in mind, that every operation uses separate db connection - ADO.NET is pooling them and afaik pool size is 100 by default.
Bartosz Sypytkowski
@Horusiath
also max concurrency limit is specific to batching journal, not shared with snapshot store, but they still use the same ADO.NET pool given the same connection string. So they can compete for connections in the pool. If you want a separate pool for snapshot store you can specify a different connection string for it - if I remember correctly ADO.NET allocates connection pools based on connection string matching
Vasily Kirichenko
@vasily-kirichenko
@Horusiath It seems you don't use bulk insert, why?
Vagif Abilov
@object
Tried different isolation levels, no difference.
Bartosz Sypytkowski
@Horusiath
@vasily-kirichenko I wanted to, just haven't got time to implement it. Current implementation was more inline with the one, that already existed before, so I wanted to be sure that it could work as drop-in replacement without breaking things. Tbh, if akka.persistence was implemented on top of streams, batching journal and normal one would probably differ only with the stage in between performing the batch operation.
Vasily Kirichenko
@vasily-kirichenko
Yeah, I see.
Jay DeBoer
@jaydeboer
@Danthar Thanks, I was reading the docs, other docs, that were not as clear. Thanks for that link, that really helped explain it. :)
mikeprag
@mikeprag
Hi All...I have implemented Akka.NET in a dispatcher / worker pattern to distribute load across multiple machines for a job processing system. On the remote machines I have a service running with Akka workers waiting to accept remote requests. When the worker (actor) is created, it instantiates a type using Assembly.LoadFrom - the location to load the dll is passed as part of the worker creation request. Then, requests to process an xml data file are sent to the worker actors which should use the assembly that was instantiated when the worker was created to process the data. My main dispatcher works with batches of data (a job) and in between jobs, stops the remote workers. When a new job is created, it creates new workers to process the job. This is working fine however on subsequent jobs, where the source .dll has changed, it seems that the old .dll is being used instead of the new one. On each new job, the dispatcher first pulls the assets (dll and other files) to a new file share where the worker should load it from and I can see from my logs that the new worker is using the correct location however the results are showing its using the .dll from a previous job (worker). I am creating my actors with a unique name and I thought this would give some isolation as new, unique workers would be created. What am I missing that the remote process is re-using workers or the .dll. I know something is not quite right as the remote service has a lock on the .dll even after its finished and the workers have been stopped as when I come to delete the assets in the shared folder, I'm unable to until I shut down the remote service. Sorry for the long winded description - Any ideas?
Stijn Herreman
@stijnherreman
@mikeprag You cannot unload an assembly once it's been loaded, nor can you reload it. You need to create a new AppDomain if you want to do that.
mikeprag
@mikeprag
Hi Stijn....Thanks for the quick response....I thought (clearly incorrectly) that multiple actors would have separate AppDomains but I guess this is not the case. Is there anyway in the Akka config to have workers created in separate AppDomains so I can 'config' out this issue or do I need to write this into my worker?
Stijn Herreman
@stijnherreman
I think this is something you need to write yourself, but I'm not sure. Others with more experience might be able to confirm that.
mikeprag
@mikeprag
@stijnherreman - Thanks. I guess it wouldn't be too hard to create an AppDomain to execute on. Is this just a general issue / thing to bear in mind with Akka that if you change any of the binaries for execution, you would have to restart the remote host process every time? Does anyone know of a way to do this from Akka config? Is this something to do with the Akka dispatcher?
Vagif Abilov
@object
@mikeprag It doesn't sound like running actors in separate AppDomains fit well actors nature. Actors have very small footprint (few hundred bytes), they don't have allocated threads, they just borrow threads to process messages. AFAIK an AppDomain drags a memory footprint of at megabytes, not even kilobytes (didn't find any figures but will be very surprised if it's below 1 meg). This shouldn't stop you from creating additional domains, but I think you need to do it outside actor system.
PinkyBrain
@PinkyBrain_gitlab
what's the right pattern for implementing throttling functionality with actors? I would like to have an actor receive a message with a collection of things to process, and it would create a new actor to process each one, and it should be able to limit the number of outstanding operations (child actors who have not replied yet)
if I understand correctly the one message at a time promise, once this actor receives the message with the collection of items to process and starts creating child actors, how will it know that one of these child actors has replied, if it can't process that reply message until it's finished processing the one with the collection of items?
would I need to use pipeTo? would it be safe to have a mutable property in that parent actor keeping track of the number of outstanding child actors, or can I get this from akka itself?
v1rusw0rm
@v1rusw0rm
@PinkyBrain_gitlab seems like Router is what you need: https://getakka.net/articles/actors/routers.html
AndreSteenbergen
@AndreSteenbergen
@PinkyBrain_gitlab you could create some sort of aggregator, with a HashSet of actors wich are still active. On a received message you remove the actor from the set, and give any outstanding work to a new actor. You can decide if you want to collect all data before returning to the original sender, or pass along directly. You could start this intermediary actor with a deathwatch. If no outstanding work is present, and the activeHashSet is empty you could killl off the actor; this way the originalSender knows when you are done procesing.
You do need to know what actortype to start foreach message, so you could create the intermedate actor with a Dictionary<Type, Type> (message => actor), and off course a number how big the active set (concurrency may be e.g. 50); When all messages are processed, simply return all results if not already done, and die
PinkyBrain
@PinkyBrain_gitlab
@v1rusw0rm so from reading that, seems I would go for a pool roundrobin router, right? The operation that each routee would perform is async, does this mean I have to force it to run async, otherwise that routee will start processing the next message in its queue and I lose the throttling effect of the router
Does that sound like the way to go?
AndreSteenbergen
@AndreSteenbergen
This function send 1 message to a very large set of actors; I had issues sending out too many messages at once, so I created a throttler, it doesn't do exactly what you need, but it can give you a start.
public class ThrottledMessenger : ReceiveActor
{
    private int workerPool;
    private readonly Queue<IActorRef> waitingActors;
    private IActorRef originalSender;
    private object originalMessage;

    public ThrottledMessenger(int parallelWorkers, IEnumerable<IActorRef> refs)
    {
        waitingActors = new Queue<IActorRef>(refs.Skip(parallelWorkers));

        ReceiveAny(x =>
        {
            originalSender = Sender;
            originalMessage = x;

            foreach (var aref in refs.Take(parallelWorkers))
            {
                workerPool++;
                aref.Tell(x);
            }
            Become(Messenging);
        });
    }

    private void Messenging()
    {
        ReceiveAny(msg =>
        {
            workerPool--;
            originalSender.Tell(msg, Sender);
            if (waitingActors.TryDequeue(out IActorRef nextActor))
            {
                workerPool++;
                nextActor.Tell(originalMessage);                    
            }
            else if (workerPool == 0)
            {
                Context.Stop(Self);
            }
        });
    }
}
BTW this only works when the actor responds with a Sender.Tell( stuff ) after it's done.
PinkyBrain
@PinkyBrain_gitlab
@AndreSteenbergen Yea that's what I need anyway, give me 5minutes to understand what you're doing there
AndreSteenbergen
@AndreSteenbergen
I can walk you through to code, if you want
It basically sends the parallel amount I have choosen a single message (.Take(parallelworkers))
all other messages are enqueued