Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • 21:57
    Arkatufus edited #4228
  • 21:46
    Arkatufus synchronize #4228
  • 20:17
    Arkatufus synchronize #4228
  • 19:55
    Arkatufus opened #4228
  • 19:14
    Arkatufus synchronize #4226
  • 18:28
    Aaronontheweb commented #4226
  • 18:27
    Aaronontheweb commented #4226
  • 18:23
    Aaronontheweb synchronize #4226
  • 18:23

    Aaronontheweb on nuget

    (compare)

  • 18:23

    Aaronontheweb on dev

    Bump Google.Protobuf from 3.11.… (compare)

  • 18:23
    Aaronontheweb closed #4225
  • 18:23

    Aaronontheweb on dev

    ActorSpawn benchmark tweaks (#4… (compare)

  • 18:23
    Aaronontheweb closed #4227
  • 18:17
    Aaronontheweb labeled #4227
  • 18:17
    Aaronontheweb labeled #4227
  • 18:17
    Aaronontheweb opened #4227
  • 18:15
    Aaronontheweb commented #4226
  • 18:13
    Arkatufus commented #4226
  • 18:12
    Arkatufus commented #4226
  • 17:54
    Arkatufus commented #4226
Stijn Herreman
@stijnherreman
I did at some point when learning Akka but didn't really understand how to properly use it
AndreSteenbergen
@AndreSteenbergen
My bet is an Aggregator with a HashSet of actors waiting to start and an active set of current workers
Stijn Herreman
@stijnherreman
My concern is that it would require splitting up the single method in multiple parts, if I understand it correctly
Arjen Smits
@Danthar
if you want your actor to block processing while its working on 1 message, you can use a combination of Become to switch states, and Stashing to stash any new Bar message, once your done, you unstash all and switch back to your initial behavior.
Regarding splitting up. yes it would
You are splitting up your async work across (either the same actor) actors
So if you want to async delete alot of stuff at the same time. You can use a workerpool and delegate that work to them. So e.g. using a workerpool as a childactor in your main actor.
that opens up to door to use supervisionstrategies on that particular action. So if it fails, you can opt to retry, or simply ignore it.
In your current code, if it fails to delete 1 file out of the 1000s in your zip (for example) it will fail the entire message
if that actor is then restarted and the message retried... well... you might have issues :P
Arjen Smits
@Danthar
@AndreSteenbergen you want to throttle the amount of actors you are currently sending work to ? Then yes, keeping track of how many you have contacted, and how many have answered is the way to go
And yes, using a HashSet of IActorRef would make that even easier.
AndreSteenbergen
@AndreSteenbergen
Thanks already started creating a ThrottledMessenger, isn't that hard, but it might have already lived somewhere in the akka system.
AndreSteenbergen
@AndreSteenbergen
I guess this does what I need ...
static class CollectionExtensions
{
    public static T RemoveFirst<T>(this ICollection<T> items)
    {
        T item = items.FirstOrDefault();
        if (item != null)
        {
            items.Remove(item);
        }
        return item;
    }
}

public class ThrottledMessenger : ReceiveActor
{
    private readonly HashSet<IActorRef> workerPool;
    private readonly HashSet<IActorRef> waitingActors;
    private IActorRef originalSender;
    private object originalMessage;

    public ThrottledMessenger(int parallelWorkers, IEnumerable<IActorRef> refs)
    {
        workerPool = new HashSet<IActorRef>(refs.Take(parallelWorkers));
        waitingActors = new HashSet<IActorRef>(refs.Skip(parallelWorkers));

        ReceiveAny(x =>
        {
            originalSender = Sender;
            originalMessage = x;

            foreach (var aref in workerPool) aref.Tell(x);
            Become(Messenging);
        });
    }

    private void Messenging()
    {
        ReceiveAny(msg =>
        {
            if (workerPool.Remove(Sender))
            {
                originalSender.Tell(msg, Sender);

                var nextActor = waitingActors.RemoveFirst();
                if (nextActor != null)
                {
                    workerPool.Add(nextActor);
                    nextActor.Tell(originalMessage);
                }
                else if (workerPool.Count == 0)
                {
                    Context.Stop(Self);
                }
            }
        });
    }
}
Am I missing something?
It acting like a proxy I guess
Arjen Smits
@Danthar
Why are you using a HashSet for waitingActors ? Looks like a Queue seems like a better fit.
AndreSteenbergen
@AndreSteenbergen
Thanks, yes you are correct.
Queue would be better
AndreSteenbergen
@AndreSteenbergen
@Danthar just looking at your repos in github. Is your speak for .net Zuid also streamable somewhere?
Arjen Smits
@Danthar
nope
not recorded
sadly
AndreSteenbergen
@AndreSteenbergen
jammer ;)
Arjen Smits
@Danthar
mja ergens wel
was wel de planning door de organisatie
maar vanwege last minute venue change is het niet door gegaan
Arjen Smits
@Danthar
Maargoed, uiteindelijk was het een stuk tammer geworden dan geplanned. Bij de omschrijving was gezet dat het voor mensen was die al bekend waren met de basics van Akka. Maar toen ik voor de groep stond had niemand ook maar de hello-world van akka gedaan :+
dus toen heb ik on-the-fly maar wat wijzigingen gemaakt ^^
AndreSteenbergen
@AndreSteenbergen
hummm (Engligh would be better for our gitter friends, sorry for starting in Dutch), Advanced Akka (java) is quite easy too find. More advanced stuff akka.net are more introductions and a reference to read up on more advanced stuff
basically you were forced by the audience to do the same :)
Sam13
@Sam13

Hello everybody. I have an actor which can be queried for files on his local filesystem. It will open the desired file and send it to the requesting actor.
Example:

private void OnReceive(FileTransferStart fileTransferStart)
 {
    var fileInfo = new FileInfo(Path.Combine(m_directory, fileTransferStart.FileName));
    FileIO.FromFile(fileInfo).Via(Flow.Create<ByteString>().Select(x => x.ToArray())).To(Sink.ActorRef<byte[]>(Sender, new FileTransferFinished())).Run(Context.Materializer()).ContinueWith(x =>
    {
        if (!x.Result.WasSuccessful)
        {
            Logger.ErrorException("Error during file transfer", x.Result.Error);
        }
    });
 }

Now I want to limit the number of active file transfers to some number, e.g. 25.
Can I use the Throttle functionality for that or do I have to implement something myself?
If throttle can be used, what are the parameters? In the source code I did not find any documentation which is understandable for me ;-).
Thanks in advance

Arjen Smits
@Danthar
your stream is now scoped to a single file
So using Throttle will only work on that file. The FileIO returns a bytestring per line i think.
If you want to throttle the amount of work currently being handled. you would have to handle that at a higher level then in your example
Arjen Smits
@Danthar
@Sam13 how you could do this with streams is by using a queue source, in which you enqueue your FileTransferStart requests, and connect your file processing flow to that Source.
Then put something in between to limit execution
as in, between the Source and your flow.
You might even do something really fancy and use a bidiflow, to build something custom
But a Throttle might be simple enough. There is plenty of docs available online about akka streams Throttle
matneyx
@matneyx

Apparently today is the day for file IO questions.

I have a List of filenames... I'd like to spin up a new child per filename, then ask the child how many lines the particular file has, and then I'd like to add the line-counts of all the filenames to another list.

Looking at the Child Per Entity Pattern (https://gigi.nullneuron.net/gigilabs/child-per-entity-pattern-in-akka-net/), I feel like I can foreach filename, generate my child, and that's where I'm kinda stuck... If I Ask instead of Tell (as he uses Tell in the example) I can't realistically bundle it into a task list, can I? A task list can't really yield results, as far as I'm aware...

I've also tried a ForEachAsync (https://stackoverflow.com/a/42570099/1672020) idea, but the Context doesn't work inside that Action.

I'm assuming there's a better solution for what I wanna do, but I don't have the slightest on what that is.

matneyx
@matneyx
TLDR; How can I: ForEach item in a list, create a new child, ask the child a question and add the result to a second list... and do it all async 'cause the questions are complicated.
v1rusw0rm
@v1rusw0rm

If I Ask instead of Tell (as he uses Tell in the example) I can't realistically bundle it into a task list, can I?

It's possible. Something like this:

var tasks = new List<Task<int>>();
foreach(var filename in filenames)
{
  var actor = CreateChild();
  tasks.Add(actor.Ask(filename, TimeSpan.FromSeconds(10)));
}
int[] lineCounts = await Task.WhenAll(tasks.ToArray());
But in your case I think it's better to use Tell instead of Ask. Do you create childs from actor or from outside?
matneyx
@matneyx
From inside their shared parent... 1 parent creates multiple children
Tell can't return a result though, can it?
I think your code sample might work. :)
v1rusw0rm
@v1rusw0rm
I wrote it without testing, so adapt it to your needs. The key here is a Task.WhenAll. Documentation: https://msdn.microsoft.com/en-us/library/hh194874(v=vs.110).aspx

Tell can't return a result though, can it?

It can't. But child can send result using Tell to parent.

matneyx
@matneyx
And then you have a separate receive in the parent for the message coming back from the child?