Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • 21:24
    dependabot-preview[bot] synchronize #4066
  • 21:24

    dependabot-preview[bot] on nuget

    Bump FSharp.Quotations.Evaluato… (compare)

  • 21:24
    dependabot-preview[bot] synchronize #3985
  • 21:24

    dependabot-preview[bot] on nuget

    Bump FsCheck.Xunit from 2.9.0 t… (compare)

  • 21:24
    dependabot-preview[bot] edited #4066
  • 21:24
    dependabot-preview[bot] edited #3985
  • 21:24
    dependabot-preview[bot] synchronize #4251
  • 21:24
    dependabot-preview[bot] synchronize #3986
  • 21:24

    dependabot-preview[bot] on nuget

    Bump NBench from 1.2.2 to 2.0.1… (compare)

  • 21:24

    dependabot-preview[bot] on nuget

    Bump NUnit from 3.6.1 to 3.12.0… (compare)

  • 21:24
    dependabot-preview[bot] edited #4251
  • 21:24
    dependabot-preview[bot] edited #3986
  • 21:23
    Aaronontheweb synchronize #4270
  • 21:23
    Aaronontheweb commented #4223
  • 21:23
    dependabot-preview[bot] edited #3986
  • 21:23
    dependabot-preview[bot] edited #4251
  • 21:23
    dependabot-preview[bot] edited #3985
  • 21:23
    dependabot-preview[bot] edited #4066
  • 21:22

    Aaronontheweb on dev

    Internalize types 2 (#4242) * … (compare)

  • 21:22
    Aaronontheweb closed #4242
Aaron Stannard
@Aaronontheweb
no idea why Microsoft did that
but there it is
if you implement a custom serializer like Google Protobuf
or MsgPack
or just use that hack with JSON.NET
you'll be good to go
Chris Dewar-English
@babelchips
@Aaronontheweb Interesting. I think I’ll give that a go. Thanks.
Marc Piechura
@marcpiechura
@Tochemey I don’t think we have a HttpRequest component in akka.streams
chipdice
@chipdice
@Aaronontheweb I upgraded my projects to 1.3.8 with DotNetty 0.4.8 to fix our port exhaustion issue. Last night it looked like we were starting to eat up ports again, but I'm not sure it was the same issue. I've calculated that I should have 360 connections between the 3 different clusters that are running. What should I be looking for when we start to open a lot of ports?
AndreSteenbergen
@AndreSteenbergen
Are there any good design patterns for telling a command to a large group of actors (100k actors or so) and waiting for an answer from all of them? Like the Aggregator pattern @Horusiath blogged about in https://bartoszsypytkowski.com/dont-ask-tell-2/
Arjen Smits
@Danthar
@stijnherreman I dont have time to go into detail right now. But have you looked at the PipeTo extensionmethod ?
Basically what you would do is use PipeTo to send the result as a message back to yourself
and have a seperate handler for that
Stijn Herreman
@stijnherreman
I did at some point when learning Akka but didn't really understand how to properly use it
AndreSteenbergen
@AndreSteenbergen
My bet is an Aggregator with a HashSet of actors waiting to start and an active set of current workers
Stijn Herreman
@stijnherreman
My concern is that it would require splitting up the single method in multiple parts, if I understand it correctly
Arjen Smits
@Danthar
if you want your actor to block processing while its working on 1 message, you can use a combination of Become to switch states, and Stashing to stash any new Bar message, once your done, you unstash all and switch back to your initial behavior.
Regarding splitting up. yes it would
You are splitting up your async work across (either the same actor) actors
So if you want to async delete alot of stuff at the same time. You can use a workerpool and delegate that work to them. So e.g. using a workerpool as a childactor in your main actor.
that opens up to door to use supervisionstrategies on that particular action. So if it fails, you can opt to retry, or simply ignore it.
In your current code, if it fails to delete 1 file out of the 1000s in your zip (for example) it will fail the entire message
if that actor is then restarted and the message retried... well... you might have issues :P
Arjen Smits
@Danthar
@AndreSteenbergen you want to throttle the amount of actors you are currently sending work to ? Then yes, keeping track of how many you have contacted, and how many have answered is the way to go
And yes, using a HashSet of IActorRef would make that even easier.
AndreSteenbergen
@AndreSteenbergen
Thanks already started creating a ThrottledMessenger, isn't that hard, but it might have already lived somewhere in the akka system.
AndreSteenbergen
@AndreSteenbergen
I guess this does what I need ...
static class CollectionExtensions
{
    public static T RemoveFirst<T>(this ICollection<T> items)
    {
        T item = items.FirstOrDefault();
        if (item != null)
        {
            items.Remove(item);
        }
        return item;
    }
}

public class ThrottledMessenger : ReceiveActor
{
    private readonly HashSet<IActorRef> workerPool;
    private readonly HashSet<IActorRef> waitingActors;
    private IActorRef originalSender;
    private object originalMessage;

    public ThrottledMessenger(int parallelWorkers, IEnumerable<IActorRef> refs)
    {
        workerPool = new HashSet<IActorRef>(refs.Take(parallelWorkers));
        waitingActors = new HashSet<IActorRef>(refs.Skip(parallelWorkers));

        ReceiveAny(x =>
        {
            originalSender = Sender;
            originalMessage = x;

            foreach (var aref in workerPool) aref.Tell(x);
            Become(Messenging);
        });
    }

    private void Messenging()
    {
        ReceiveAny(msg =>
        {
            if (workerPool.Remove(Sender))
            {
                originalSender.Tell(msg, Sender);

                var nextActor = waitingActors.RemoveFirst();
                if (nextActor != null)
                {
                    workerPool.Add(nextActor);
                    nextActor.Tell(originalMessage);
                }
                else if (workerPool.Count == 0)
                {
                    Context.Stop(Self);
                }
            }
        });
    }
}
Am I missing something?
It acting like a proxy I guess
Arjen Smits
@Danthar
Why are you using a HashSet for waitingActors ? Looks like a Queue seems like a better fit.
AndreSteenbergen
@AndreSteenbergen
Thanks, yes you are correct.
Queue would be better
AndreSteenbergen
@AndreSteenbergen
@Danthar just looking at your repos in github. Is your speak for .net Zuid also streamable somewhere?
Arjen Smits
@Danthar
nope
not recorded
sadly
AndreSteenbergen
@AndreSteenbergen
jammer ;)
Arjen Smits
@Danthar
mja ergens wel
was wel de planning door de organisatie
maar vanwege last minute venue change is het niet door gegaan
Arjen Smits
@Danthar
Maargoed, uiteindelijk was het een stuk tammer geworden dan geplanned. Bij de omschrijving was gezet dat het voor mensen was die al bekend waren met de basics van Akka. Maar toen ik voor de groep stond had niemand ook maar de hello-world van akka gedaan :+
dus toen heb ik on-the-fly maar wat wijzigingen gemaakt ^^
AndreSteenbergen
@AndreSteenbergen
hummm (Engligh would be better for our gitter friends, sorry for starting in Dutch), Advanced Akka (java) is quite easy too find. More advanced stuff akka.net are more introductions and a reference to read up on more advanced stuff
basically you were forced by the audience to do the same :)
Sam13
@Sam13

Hello everybody. I have an actor which can be queried for files on his local filesystem. It will open the desired file and send it to the requesting actor.
Example:

private void OnReceive(FileTransferStart fileTransferStart)
 {
    var fileInfo = new FileInfo(Path.Combine(m_directory, fileTransferStart.FileName));
    FileIO.FromFile(fileInfo).Via(Flow.Create<ByteString>().Select(x => x.ToArray())).To(Sink.ActorRef<byte[]>(Sender, new FileTransferFinished())).Run(Context.Materializer()).ContinueWith(x =>
    {
        if (!x.Result.WasSuccessful)
        {
            Logger.ErrorException("Error during file transfer", x.Result.Error);
        }
    });
 }

Now I want to limit the number of active file transfers to some number, e.g. 25.
Can I use the Throttle functionality for that or do I have to implement something myself?
If throttle can be used, what are the parameters? In the source code I did not find any documentation which is understandable for me ;-).
Thanks in advance

Arjen Smits
@Danthar
your stream is now scoped to a single file
So using Throttle will only work on that file. The FileIO returns a bytestring per line i think.
If you want to throttle the amount of work currently being handled. you would have to handle that at a higher level then in your example
Arjen Smits
@Danthar
@Sam13 how you could do this with streams is by using a queue source, in which you enqueue your FileTransferStart requests, and connect your file processing flow to that Source.