Aaronontheweb on dev
ClusterStressSpec and Cluster F… (compare)
IActorRef
would make that even easier.
static class CollectionExtensions
{
public static T RemoveFirst<T>(this ICollection<T> items)
{
T item = items.FirstOrDefault();
if (item != null)
{
items.Remove(item);
}
return item;
}
}
public class ThrottledMessenger : ReceiveActor
{
private readonly HashSet<IActorRef> workerPool;
private readonly HashSet<IActorRef> waitingActors;
private IActorRef originalSender;
private object originalMessage;
public ThrottledMessenger(int parallelWorkers, IEnumerable<IActorRef> refs)
{
workerPool = new HashSet<IActorRef>(refs.Take(parallelWorkers));
waitingActors = new HashSet<IActorRef>(refs.Skip(parallelWorkers));
ReceiveAny(x =>
{
originalSender = Sender;
originalMessage = x;
foreach (var aref in workerPool) aref.Tell(x);
Become(Messenging);
});
}
private void Messenging()
{
ReceiveAny(msg =>
{
if (workerPool.Remove(Sender))
{
originalSender.Tell(msg, Sender);
var nextActor = waitingActors.RemoveFirst();
if (nextActor != null)
{
workerPool.Add(nextActor);
nextActor.Tell(originalMessage);
}
else if (workerPool.Count == 0)
{
Context.Stop(Self);
}
}
});
}
}
Hello everybody. I have an actor which can be queried for files on his local filesystem. It will open the desired file and send it to the requesting actor.
Example:
private void OnReceive(FileTransferStart fileTransferStart)
{
var fileInfo = new FileInfo(Path.Combine(m_directory, fileTransferStart.FileName));
FileIO.FromFile(fileInfo).Via(Flow.Create<ByteString>().Select(x => x.ToArray())).To(Sink.ActorRef<byte[]>(Sender, new FileTransferFinished())).Run(Context.Materializer()).ContinueWith(x =>
{
if (!x.Result.WasSuccessful)
{
Logger.ErrorException("Error during file transfer", x.Result.Error);
}
});
}
Now I want to limit the number of active file transfers to some number, e.g. 25.
Can I use the Throttle
functionality for that or do I have to implement something myself?
If throttle can be used, what are the parameters? In the source code I did not find any documentation which is understandable for me ;-).
Thanks in advance
Throttle
will only work on that file. The FileIO returns a bytestring per line i think.
Throttle
might be simple enough. There is plenty of docs available online about akka streams Throttle
Apparently today is the day for file IO questions.
I have a List of filenames... I'd like to spin up a new child per filename, then ask the child how many lines the particular file has, and then I'd like to add the line-counts of all the filenames to another list.
Looking at the Child Per Entity Pattern (https://gigi.nullneuron.net/gigilabs/child-per-entity-pattern-in-akka-net/), I feel like I can foreach filename, generate my child, and that's where I'm kinda stuck... If I Ask instead of Tell (as he uses Tell in the example) I can't realistically bundle it into a task list, can I? A task list can't really yield results, as far as I'm aware...
I've also tried a ForEachAsync (https://stackoverflow.com/a/42570099/1672020) idea, but the Context doesn't work inside that Action.
I'm assuming there's a better solution for what I wanna do, but I don't have the slightest on what that is.
If I Ask instead of Tell (as he uses Tell in the example) I can't realistically bundle it into a task list, can I?
It's possible. Something like this:
var tasks = new List<Task<int>>();
foreach(var filename in filenames)
{
var actor = CreateChild();
tasks.Add(actor.Ask(filename, TimeSpan.FromSeconds(10)));
}
int[] lineCounts = await Task.WhenAll(tasks.ToArray());
Task.WhenAll
. Documentation: https://msdn.microsoft.com/en-us/library/hh194874(v=vs.110).aspx
Tell can't return a result though, can it?
It can't. But child can send result using Tell to parent.
ReceiveAsync<RequestAllInstances>( HandleRequestAllInstancesAsync );
private async Task HandleRequestAllInstancesAsync( RequestAllInstances msg )
{
var sender = Sender;
var list = new List<A> { new A(), new A(), new A() };
await Task.Delay( 10000 ).ConfigureAwait( false );
// Sender.Tell( new RespondAllInstances( list.AsReadOnly() ) ); <-- this is going to crash with missing ActorContext
sender.Tell( new RespondAllInstances( list.AsReadOnly() ) ); // but if saved beforehand then it's ok
}