public class ThrottledMessenger : ReceiveActor
{
private int workerPool;
private readonly Queue<IActorRef> waitingActors;
private IActorRef originalSender;
private object originalMessage;
public ThrottledMessenger(int parallelWorkers, IEnumerable<IActorRef> refs)
{
waitingActors = new Queue<IActorRef>(refs.Skip(parallelWorkers));
ReceiveAny(x =>
{
originalSender = Sender;
originalMessage = x;
foreach (var aref in refs.Take(parallelWorkers))
{
workerPool++;
aref.Tell(x);
}
Become(Messenging);
});
}
private void Messenging()
{
ReceiveAny(msg =>
{
workerPool--;
originalSender.Tell(msg, Sender);
if (waitingActors.TryDequeue(out IActorRef nextActor))
{
workerPool++;
nextActor.Tell(originalMessage);
}
else if (workerPool == 0)
{
Context.Stop(Self);
}
});
}
}