Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • 11:49
    IgorFedchenko commented #4085
  • 10:31
    IgorFedchenko commented #4085
  • 08:36
    IgorFedchenko commented #4085
  • Dec 06 20:57
    IgorFedchenko commented #4085
  • Dec 06 20:32
    IgorFedchenko commented #4085
  • Dec 06 20:01
    IgorFedchenko commented #4085
  • Dec 06 19:55
    IgorFedchenko commented #4085
  • Dec 06 16:22
    Aaronontheweb labeled #3997
  • Dec 06 16:22
    Aaronontheweb closed #3997
  • Dec 06 16:20
    IgorFedchenko commented #3997
  • Dec 06 16:08
    IgorFedchenko commented #4085
  • Dec 06 15:50
    Aaronontheweb assigned #4085
  • Dec 06 15:50
    Aaronontheweb labeled #4085
  • Dec 06 15:50
    Aaronontheweb labeled #4085
  • Dec 06 15:49
    Aaronontheweb closed #4032
  • Dec 06 14:58
    IgorFedchenko commented #4032
  • Dec 06 14:57
    IgorFedchenko opened #4085
  • Dec 05 17:21
    Aaronontheweb synchronize #4079
  • Dec 05 17:20
    Aaronontheweb labeled #4084
  • Dec 05 17:20
    Aaronontheweb labeled #4084
AndreSteenbergen
@AndreSteenbergen
This function send 1 message to a very large set of actors; I had issues sending out too many messages at once, so I created a throttler, it doesn't do exactly what you need, but it can give you a start.
public class ThrottledMessenger : ReceiveActor
{
    private int workerPool;
    private readonly Queue<IActorRef> waitingActors;
    private IActorRef originalSender;
    private object originalMessage;

    public ThrottledMessenger(int parallelWorkers, IEnumerable<IActorRef> refs)
    {
        waitingActors = new Queue<IActorRef>(refs.Skip(parallelWorkers));

        ReceiveAny(x =>
        {
            originalSender = Sender;
            originalMessage = x;

            foreach (var aref in refs.Take(parallelWorkers))
            {
                workerPool++;
                aref.Tell(x);
            }
            Become(Messenging);
        });
    }

    private void Messenging()
    {
        ReceiveAny(msg =>
        {
            workerPool--;
            originalSender.Tell(msg, Sender);
            if (waitingActors.TryDequeue(out IActorRef nextActor))
            {
                workerPool++;
                nextActor.Tell(originalMessage);                    
            }
            else if (workerPool == 0)
            {
                Context.Stop(Self);
            }
        });
    }
}
BTW this only works when the actor responds with a Sender.Tell( stuff ) after it's done.
PinkyBrain
@PinkyBrain_gitlab
@AndreSteenbergen Yea that's what I need anyway, give me 5minutes to understand what you're doing there
AndreSteenbergen
@AndreSteenbergen
I can walk you through to code, if you want
It basically sends the parallel amount I have choosen a single message (.Take(parallelworkers))
all other messages are enqueued
PinkyBrain
@PinkyBrain_gitlab
Sure, so I would need to create a pool of actors as large as the batch of messages I want to process beforehand right?
AndreSteenbergen
@AndreSteenbergen
Do you know the actor type for the messages?
PinkyBrain
@PinkyBrain_gitlab
yes
AndreSteenbergen
@AndreSteenbergen
is it just 1?
actor type
PinkyBrain
@PinkyBrain_gitlab
yes
AndreSteenbergen
@AndreSteenbergen
You can create the numbers of actors up front, as large as your pool - better yet: just create them when you receive the messages to process
everytime a message comes back, you can hand it one of the messages from your queue
I guess it isn't actually a lot different from this code, you would give it IEnumerable<object>, instead of IActorf ref
the ctor can be, int numberOfWorkers, and a method to create the actor (Props)
the initial receive would be the IEnumerable<object> (your message), you create your batch of actors in the Receive
enqueue all messages you can't process yet
and change state in Receiving using Become
AndreSteenbergen
@AndreSteenbergen
every time you receive something, you can send the parent actor the result, and give the actor which gave the result a message
when done, kill off the actr
ReceiveAny in the constructor would be Receive<ObjectHoldingAllMessagesToProcess>
I hope I make sense
AndreSteenbergen
@AndreSteenbergen
The actor creating this intermediate actor Watch it using intermediate.WatchWith(new DoneProcessingLotsOfMessages()), which will raise DoneProcessingLotsOfMessages
PinkyBrain
@PinkyBrain_gitlab
How about this: the throttledmessenger receives a message with the batch, it would first put the items in the batch into a queue, then dequeue parallelWorkers many items and create an actor for each and Tell them those items, then become a new state, where on each message received it Dequeues another item, creates another actor, and sends it that item, until the queue is empty
then I dont need to keep track of the workerPool or create all my actors up front
AndreSteenbergen
@AndreSteenbergen
That's what I meant with my first line ;) better yet: just create them when you receive the messages to process
Yuo don't need to create a new actor
the actor which processed the previous messages, can process a new one
PinkyBrain
@PinkyBrain_gitlab
the throttledmessenger can compile the results of all its children, so I would have just one originalSender.Tell in the case where the queue is empty, right? so the actor that wants the batch processed sends ThrottledMessenger an Ask, would that work?
ah right, so I would have at most parallelWorkers actors
AndreSteenbergen
@AndreSteenbergen
Yes, that would work
PinkyBrain
@PinkyBrain_gitlab
This sounds like an implementation of the roundrobin router :P
AndreSteenbergen
@AndreSteenbergen
I was just typing that... it is the router
PinkyBrain
@PinkyBrain_gitlab
hah, cool! i will give this a go, thanks for the help
AndreSteenbergen
@AndreSteenbergen
you can alse go for the router
;)
the throttler is an adoptation from the aggregator discussed: https://bartoszsypytkowski.com/dont-ask-tell-2/
PinkyBrain
@PinkyBrain_gitlab
I need to create the throttling actor from inside a method on a static class, would I have to pass that method the context in order to create the child?
or is there a way to pass the actor ref and create it from that?
AndreSteenbergen
@AndreSteenbergen
You can pass IActorRefs everywhere, that's the location transparency
You can create the when you have a context, actorsystem, or actor itself
Context.ActorOf, or actorSystem.ActorOf
PinkyBrain
@PinkyBrain_gitlab
the context can only be accessed from within the actor class though
if I'm in a static class method I can't see the Context, right?
and I don't have the system either
I guess I should create that child actor outside the static method and just pass it that actorref
Shukhrat Nekbaev
@snekbaev
https://github.com/petabridge/akkadotnet-code-samples/tree/master/TestKit#how-do-i-change-the-configuration-of-the-testactorsystem
is it possible to use App.config for that or is it not recommended? Tried, but it seems to be ignored by default, however, managed to make it work via:
public SomeActor() : base(ConfigurationFactory.Load())
{
}
AndreSteenbergen
@AndreSteenbergen
Hey guys, are there some benchmarks about persistence? I am trying to find out which persistance layer is best for my need, I am trying to store high numbers of actor journals. I can't keep it all in memory, so I need some disc persistence. But I also need it to be fast.