Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
    Arjen Smits
    @Danthar
    Am i forgetting something here? Or am i making the wrong assumptions ?
    image.png
    AndreSteenbergen
    @AndreSteenbergen
    I am waiting on the new 1.0 for kafka. After that I will create a new PR for kafka streams as well. I guess Akka 1.3.11 will be sooner then Kafka, so I will base it on Akka 1.3.11 when the time comes. There is quite some discussion going on about the .net api. I have it working on beta2, but a beta3 is imminent, with breaking changes.
    Marc Piechura
    @marcpiechura
    @Danthar has foreach a overload which supports Func<Task> ?
    Arjen Smits
    @Danthar
    i think it does?
    Marc Piechura
    @marcpiechura
    Could be Action
    Then your task would handle the exception
    Arjen Smits
    @Danthar
    ill have to check.
    Marc Piechura
    @marcpiechura
    But donÔÇÖt know why your app would crash ^^
    Arjen Smits
    @Danthar
    in meeting atm ill get back this aspa
    asap
    myeah app crashes cause stream propagates exception up
    Marc Piechura
    @marcpiechura
    mh, strange, Sink.Foreach takes a Action<TIn>
    Sink<TIn, Task> ForEach<TIn>(Action<TIn> action)
    so I would expect that the async creates a task where your exception is thrown and simply "returns" a failed tasked, but maybe the compiler is smart enough to reduce it to a simple action since you don't await anything
    Arjen Smits
    @Danthar
    yeah the throw exception is a placeholder. you can see the await indexer.blah
    commented out
    i can try to remove the async and see if that changes anything
    or use a stub indexer that throws an exception
    Marc Piechura
    @marcpiechura
    or use .Via(Flow.SelectAsync(throw)).To(Sink.Ignore)
    Arjen Smits
    @Danthar
    yeah that crossed my mind as well.
    Marc Piechura
    @marcpiechura
    just to make sure no strange things are happening with async await
    Arjen Smits
    @Danthar
    Ok yeah its an async thing
    if i remove the async keyword, it restarts as expected
    Marc Piechura
    @marcpiechura
    ­čĹŹ
    Anton Fernando
    @anton55_gitlab
    anyone know how to create a akka stream using mqtt, basically anything published to a mqtt channel , akka stream should spit out?
    Aravindan
    @aravindanck
    Hello Guys, I'm beginner here. Would like to know your thoughts on this:
    What is the maximum number of actors (which uses Akka streams API to consume data from an infinite source) that we can create inside a single Actor system? I'm in need of creating a huge number of actors for my use case that I'm working.
    Bartosz Sypytkowski
    @Horusiath

    @aravindanck in general this is a good question for (https://gitter.im/akkadotnet/akka.net - akka .NET version) or (https://gitter.im/akka/akka - akka JVM version):

    The simple answer is: millions - the pure actor overhead is several hundred of bytes. This doesn't count any sort of logic and state you're keeping inside them. But if you want some more advanced features, like persistence, this will require additional memory per actor + time spend on recovery/store process, which is heavily dependent on the actual persistence provider you're using - usually this is the major limiting issue aside of the hardware requirements.

    Aravindan
    @aravindanck
    Thanks for the reply @Horusiath. Yes, I'm trying this out in JVM. I'm creating actors per subscription (the system is expected to have subscriptions in millions). Each actor would consume data from an infinite source (like Kafka) - I'm thinking of using a Rx implementation here. Each of the actor would perform some asynchronous push operation (like pushing to an URL) for each of the consumed data (this is based on subscription). What I'm not clear is how does it manage a huge number of actors with a bunch of threads?
    As far as the system I'm building, I'm not looking at using any sort of persistence, but resilience is a must i.e., how effective would it be to create a huge number of actors in a single system vs options to distribute the actors across cluster nodes somehow?
    Sean Farrow
    @SeanFarrow

    Hi,

    I've got a system that fires two .Net events.
    One when an item appears on the connection and the other if the connection to the target ssystem is dropped for any reason. What is the easiest way of converting both of these event handlers to a single stream?

    Bartosz Sypytkowski
    @Horusiath
    @SeanFarrow you can wrap them both with another type of message.
    Boban
    @bobanco
    @Horusiath , @marcpiechura do you have an idea how i can complete the stream based on a condition, on example if i have a source which emits elements and lets say after the 10th element or if the element contains some value which after that i dont want to continue and i want to complete the stream, i know i can break the stream with exception but i need to gracefully complete the stream, because i dont want to lose the inflight items which are processing because am also using SelectAsync with some degree of paralelisam.
    Emilian BALANESCU
    @emilianb

    Hi,
    I'm new with Akka.NET streams and I'm experiencing a strange behavior. I try to implement parallelism on a part of a Akka.NET Stream using a Balancer stage and a Merge stage. The strange behaviour is that I am not getting any speed increase in the processing of the stream. My impression is that it does not spawn multiple threads (one for each parallel path) as I had expected.

    Can you please give me a hint if there is some configuration settings that I'm missing?

    AndreSteenbergen
    @AndreSteenbergen
    @bobanco you can use TakeWhile it takes a predicate and stops the stream after the the first false
    Boban
    @bobanco
    @AndreSteenbergen thank you, will give it a shot!
    acds
    @acds
    Hi All, I'm learning and hope you all can help so I can help back in the future... have a stream process that I want to split across actors, with an Source.ActorRef in the coordinator actor that takes a message an transforms it (with a number of stages), resulting in a BroadcastHub that, upon creation of subordinate Actors (that materialize there own streams with their own Source.ActorRef's), would runForeach the broadcast hub to a Sink.ActorRef (to the subordinate Actor) to link things together.... has anyone ever tried this (expensive googling only gives me Scala results)?
    acds
    @acds
    @AndreSteenbergen interested in what you are doing here, please keep us all in the loop.
    acds
    @acds
    @acds Also per above if you nest a Source.ActorRef how do you get the materialized IActorRef (that you can then Tell/Ask or use ActorRefWithAck - to support backpresure)?
    Mark Tkachenko
    @Rizzen
    Hello! I'm using Akka.Streams.Amqp package and it seems little outdated - it throws exception on SetHandler, but version from sources doesnt. Question is - when I can expect new version of package in Nuget? Thank you!
    Ufuk Hac─▒o─čullar─▒
    @uhaciogullari
    Hi everyone
    I had a question about Akka streams that I posted on StackOverflow but it didn't get much attention there.
    I wanted to try my chance here as well :)
    Bartosz Sypytkowski
    @Horusiath
    @uhaciogullari I've made some custom stage working on that case here: https://gist.github.com/Horusiath/f988ab999d26b7f25d2d1f72834fbcb6
    Ufuk Hac─▒o─čullar─▒
    @uhaciogullari
    @Horusiath Thank you for looking into it. I'll test it when I get home.
    Bartosz Sypytkowski
    @Horusiath
    it also covers one thing, that your original sample code didn't - in your example if you await inside your if statement, the entire processing is blocked until the request completes
    this is poor utilization of batch processing
    my sample calls the async function, but doesn't wait for it to complete, so if buffer was not empty it still can process elements in the meantime
    Ufuk Hac─▒o─čullar─▒
    @uhaciogullari
    @Horusiath Great point. I was trying to get it to work before optimizing that part so you answered my next question too. Thanks a lot :)
    Ufuk Hac─▒o─čullar─▒
    @uhaciogullari
    @Horusiath I tried your snippet but it seems like there's a bug. The first fetch works fine but there are concurrent fetches on the second time.
    I tweaked it a bit(mainly was pulled flag) but couldn't get it to work. Any idea what may be wrong?
    Bartosz Sypytkowski
    @Horusiath
    @uhaciogullari maybe your fetch method returns less items, than the threshold you've specified?
    ah, I see - eg. treshold = 10, queue has 9 items. Source gets pulled. Since queue has less items than threshold, we call fetch and return next item from the queue. But then source is pulled again. Queue now has 8 items (still less than treshold), but previous fetch didn't finish. So it calls it again.
    Bartosz Sypytkowski
    @Horusiath
    I've updated the gist - just needed to add extra flag to guard fetch calls.
    Ufuk Hac─▒o─čullar─▒
    @uhaciogullari
    @Horusiath Works perfectly! Thanks a lot for your help.
    Would you like to post your answer on StackOverflow also?
    Prahveen Thiruchelvam
    @prahveen
    Hi, I'm trying to create akka source for handle ably messages with backpressure .. but still I couldn't understand how akka source is working . can any one help me