Aaronontheweb on dev
Akka.DependencyInjection.Servic… (compare)
Aaronontheweb on 1.4.16
Aaronontheweb on master
Bump AkkaVersion from 1.4.14 to… Correct warning on circuit too … Added v1.4.16 release notes (#1… and 1 more (compare)
Aaronontheweb on 1.4.16
Hello everybody. I have an actor which can be queried for files on his local filesystem. It will open the desired file and send it to the requesting actor.
Example:
private void OnReceive(FileTransferStart fileTransferStart)
{
var fileInfo = new FileInfo(Path.Combine(m_directory, fileTransferStart.FileName));
FileIO.FromFile(fileInfo).Via(Flow.Create<ByteString>().Select(x => x.ToArray())).To(Sink.ActorRef<byte[]>(Sender, new FileTransferFinished())).Run(Context.Materializer()).ContinueWith(x =>
{
if (!x.Result.WasSuccessful)
{
Logger.ErrorException("Error during file transfer", x.Result.Error);
}
});
}
Now I want to limit the number of active file transfers to some number, e.g. 25.
Can I use the Throttle
functionality for that or do I have to implement something myself?
If throttle can be used, what are the parameters? In the source code I did not find any documentation which is understandable for me ;-).
Thanks in advance
Throttle
will only work on that file. The FileIO returns a bytestring per line i think.
Throttle
might be simple enough. There is plenty of docs available online about akka streams Throttle
Apparently today is the day for file IO questions.
I have a List of filenames... I'd like to spin up a new child per filename, then ask the child how many lines the particular file has, and then I'd like to add the line-counts of all the filenames to another list.
Looking at the Child Per Entity Pattern (https://gigi.nullneuron.net/gigilabs/child-per-entity-pattern-in-akka-net/), I feel like I can foreach filename, generate my child, and that's where I'm kinda stuck... If I Ask instead of Tell (as he uses Tell in the example) I can't realistically bundle it into a task list, can I? A task list can't really yield results, as far as I'm aware...
I've also tried a ForEachAsync (https://stackoverflow.com/a/42570099/1672020) idea, but the Context doesn't work inside that Action.
I'm assuming there's a better solution for what I wanna do, but I don't have the slightest on what that is.
If I Ask instead of Tell (as he uses Tell in the example) I can't realistically bundle it into a task list, can I?
It's possible. Something like this:
var tasks = new List<Task<int>>();
foreach(var filename in filenames)
{
var actor = CreateChild();
tasks.Add(actor.Ask(filename, TimeSpan.FromSeconds(10)));
}
int[] lineCounts = await Task.WhenAll(tasks.ToArray());
Task.WhenAll
. Documentation: https://msdn.microsoft.com/en-us/library/hh194874(v=vs.110).aspx
Tell can't return a result though, can it?
It can't. But child can send result using Tell to parent.
ReceiveAsync<RequestAllInstances>( HandleRequestAllInstancesAsync );
private async Task HandleRequestAllInstancesAsync( RequestAllInstances msg )
{
var sender = Sender;
var list = new List<A> { new A(), new A(), new A() };
await Task.Delay( 10000 ).ConfigureAwait( false );
// Sender.Tell( new RespondAllInstances( list.AsReadOnly() ) ); <-- this is going to crash with missing ActorContext
sender.Tell( new RespondAllInstances( list.AsReadOnly() ) ); // but if saved beforehand then it's ok
}
@Danthar Thanks for your help. I ended up with something like that:
public FileManager()
{
m_fileTransferQueue = Source.Queue<Tuple<FileTransferStart, IActorRef>>(25, OverflowStrategy.Backpressure).To(Sink.ForEach<Tuple<FileTransferStart, IActorRef>>(DoProcess)).Run(Context.Materializer());
}
protected override void PostStop()
{
base.PostStop();
m_fileTransferQueue.Complete();
}
private void OnReceive(FileTransferStart fileTransferStart)
{
m_fileTransferQueue.OfferAsync(new Tuple<FileTransferStart, IActorRef>(fileTransferStart, Sender));
}
private void DoProcess(Tuple<FileTransferStart, IActorRef> fileTransferStart)
{
var fileInfo = new FileInfo(Path.Combine(m_directory, fileTransferStart.Item1.FileName));
var transferTask = FileIO.FromFile(fileInfo).Via(Flow.Create<ByteString>().Select(x => x.ToArray())).To(Sink.ActorRef<byte[]>(fileTransferStart.Item2, new FileTransferFinished())).Run(Context.Materializer()).ContinueWith(x =>
{
if (!x.Result.WasSuccessful)
{
Logger.ErrorException("Error during file transfer", x.Result.Error);
}
});
// Run synchronous to have only 25 active items in queue?
transferTask.Wait();
}
Is it correct that the processing of the file must run synchronous to ensure only 25 items are processed at the same time?
Can anyone help me?
I'm looking for a way to combine Rest API with Akka.net in a Cluster System. I would not connect the rest api receiver inside the cluster,
but sending messages for the cluster group.
I saw something about recepcionist, but when I tried to use inside the lighthouse system, It occurred exception because it was required that
know all messages types.
Is there any way to use these things together with best performance?
var
would be much better than littering the code with comments.