by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • Jul 03 20:21
    Arkatufus edited #4500
  • Jul 03 20:20
    Arkatufus opened #4500
  • Jul 03 18:12
    ismaelhamed opened #4499
  • Jul 03 17:40
    ismaelhamed opened #4498
  • Jul 02 19:33
    Danthar commented #4487
  • Jul 02 19:27
    Danthar closed #4494
  • Jul 02 19:27
    Danthar commented #4494
  • Jul 02 16:12
    Arkatufus commented #4265
  • Jul 02 15:18
    ismaelhamed edited #4497
  • Jul 02 15:16
    ismaelhamed edited #4497
  • Jul 02 15:15
    ismaelhamed opened #4497
  • Jul 02 15:07
    ismaelhamed commented #4482
  • Jul 02 08:02
    glikoz commented #4495
  • Jul 02 08:02
    glikoz commented #4495
  • Jul 01 13:26
    Aaronontheweb closed #4495
  • Jul 01 13:26
    Aaronontheweb commented #4495
  • Jul 01 13:20
    Aaronontheweb commented #4496
  • Jul 01 13:20
    Aaronontheweb milestoned #4496
  • Jul 01 13:20
    Aaronontheweb labeled #4496
  • Jul 01 13:20
    Aaronontheweb labeled #4496
Marco
@coldwarrl
In the docu it is mentioned that "It is possible to pass the original sender inside the reply messages if the client is supposed to communicate directly to the actor in the cluster." but that does not seem to work for me. I use the cluster client to get a message containing an actor reference (IActorRef) from the cluster. When I am trying to send a message to that very actor I get a dead letter and even the client looses contact to the cluster. Is the docu wrong here or is my understanding wrong here ?
(communicating via the ClusterClient.Send* works fine)
voltcode
@voltcode
Hi guys, is there a way to obtain event timestamp from an Akka.Persistence? I see there's a timestamp column in the schema, but EventEnvelope that's available in SqlReadJournal does not have that field
I'm running the CurrentEventsByPersistenceId query and I'd love to avoid a timestamp field for my events if there's already one in the database
voltcode
@voltcode
that's using SqlReadJournal
Ismael Hamed
@ismaelhamed
@voltcode we need to port akka/akka#28383
voltcode
@voltcode
thanks @ismaelhamed , should I open a ticket for this in akkanet repo ? I'm not sure how to proceed
huzaifak
@huzaifak
@/all we are using Hyperion for de/serialization our objects, our app is .net framework based. we use distributed data and we are running into an issue where during deserialization the constructor call is skipped and it causes issue since we do some initialization in constructor. is there a way to enforce constructor call or have a surrogate be called to perform some init operations?
huzaifak
@huzaifak
we do implement iserializable on the class in question here
hflorin
@hflorin
How can I Tell instead of Ask in a web a API which requires a result of the operation to be returned to the caller?
Ismael Hamed
@ismaelhamed
@voltcode you could open an issue and reference that PR to create awareness. I have it in the backlog, but my time for contributions is limited
AndreSteenbergen
@AndreSteenbergen
Hi guys, I hope someone can help me. I have created a persistence journal based on kafka filling a rocksdb journal. During the tests I can't get the test case to call PostStop on the journal. I have created a public repo. I guess I am missing small, but I can't seem to find my problem. The repo I made is here: https://github.com/AndreSteenbergen/Akka.Persistence.KafkaRocks
When calling dotnet testthe first one crashes, because I haven't implemented deleting messages yet, the second just says "Test run in progress." It seems the process just won't stop.
Chartlog
@ChartlogInc_twitter
Hello guys, I'm having an issue creating a ConsistentHashing pool router. It is complaining about duplicate keys but it's not possible.
Couldn't route message with consistent hash key [882f862b-a502-4289-b1f1-fca9a9e1f3c8] due to [An item with the same key has already been added. Key: [2135871908, akka://TradeProcessingSystem/user/$a/$a/$a/$Drc]]
I am debugging and this is the only message to pass through the system. I've pretty much followed the docs on how to do this
 var props = Context.DI().Props<ProcessUserSessionActor>()
                .WithRouter(new ConsistentHashingPool(10000)
                    .WithHashMapping(b => ((ProcessUserSession)b).UserId.ToString()));

            Receive<ProcessUserSession>(a =>
            {
                var userSessionProcessor = Context.ActorOf(props);
                userSessionProcessor.Tell(a);
            });
me-slove
@me-slove
we were using a external seed node system and it died during a patch cycle and nobody noticed it didn't restart properly. the issue we have seen though, is that the other cluster apps are leaking sockets trying to reconnect to the seed node. anybody else seen something this? closing the app and restart releases them, of course, but that's essentially manual
Chartlog
@ChartlogInc_twitter
I figured it out. I was under the impression that creating the props above would create a pool of max 10000 instances but it actually creates 10000 instances on Context.ActorOf when it receives the first message. It seems that if my key is the number 1 then it creates 10,000 instances with hash key 1. I have the nrOfInstances variable set to 1 so I'm going to have to test it to see if the pool isn't capped at 1 routee.
to11mtm
@to11mtm

@ChartlogInc_twitter : Maybe not the best analogy but I like to think of Props as like a 'serialized constructor call'. The props don't do anything until they are used in an ActorOf type context. (This is how remote deployment functions; the Props are sent over the wire to the system the actors will run on.)

Without trying to make too many assumptions about what you're trying to do, I'd suggest declaring something like:

private IActorRef _userSessionProcessorPool;

in your class, and then placing _userSessionProcessorPool = Context.ActorOf(props); outside of your receive method (but still inside the constructor.)

nathvi
@nathvi
Hello all
:)
Chartlog
@ChartlogInc_twitter
@to11mtm roger, that's how I ended up doing it. But just to give you some info, I was just trying to create one actor per user so when a user upload's his data, it's always guaranteed there will be no race conditions
Hello @nathvi
Bartosz Sypytkowski
@Horusiath
@ChartlogInc_twitter if you want 1-1 mapping between some entity and actor, this is what Akka.Cluster.Sharding is for
Ben Foster
@benfosterdev_twitter
Is it possible for a parent actor to be notified when a child actor is restarted? I need to resend a "start" message from the parent as currently the child restarts but sits in an idle state.
Ben Foster
@benfosterdev_twitter
I solved the above :)
Ben Foster
@benfosterdev_twitter
Second question, it looks likes Akka.NET does not seem to honour singleton scoped services when using Microsoft DI
When using DI().Props to create an actor instance, any singleton instances are recreated
Chartlog
@ChartlogInc_twitter
Ben, you should not be using singletons in your actor systems. You should divert the responsibility of lifecycle management to your actors. The only singleton I ever use in my akka projects are the akka systems themselves. So if you need some service to be singleton then chances are you need an actor around that service that is always alive
Chartlog
@ChartlogInc_twitter
@benfosterdev_twitter
Ben Foster
@benfosterdev_twitter
@ChartlogInc_twitter in this case, it's the Amazon SDK which similar to HttpClient Factory should be a singleton. I wanted to avoid having a bottleneck actor by pushing all AWS operations through a single actor

New one for today:

        private static SupervisorStrategy ProducerSupervisorStrategory = new OneForOneStrategy(
            maxNrOfRetries: 5,
            withinTimeRange: TimeSpan.FromSeconds(30),
            localOnlyDecider: ex => Directive.Stop
        );
            _sqsProducer = Context.ActorOf(Context.DI().Props<SqsProducer>().WithSupervisorStrategy(ProducerSupervisorStrategory), "producer");

Supervisor strategy makes no difference when the actor throws an exception. It restarts indefinitely. I even tried swapping out for StoppingSupervisorStrategy and it does the same.

glikoz
@glikoz
Hi guys, I am implementing Algorithmic Trading Infrastructure. I am using akka.net for it. I've completed it. But I am planning to make some changes to my system. Normally Tick Event (when new price information arrived in my system) was TELLed two Actor groups. One of them is Virtual Broker (handling something) and the other is Strategies that related to this TickEvent (relation is symbol information). Both groups can TELL NewOrderEvent independently. Now I want to synchronize this NewOrderEvents. When TickEvent has come it will be TELLed to Actors and all NewOrders are unioned at one point and will be processed. I don't want to use blocking ASK approach. How can implement such kind of requirement.
Ondrej Pialek
@ondrejpialek
Hi @glikoz , you need to aggregate the responses and then emit just one. You do this by inserting a new actor in between those producers and ultimate consumer and have it wait for a while to collect the NewOrderEvents and have some logic in there to decide what it sends further down. (E.g. wait for a while for both responses and somehow aggregate them, or simply let the first event be sent out, discarding the second one etc...) Here is a detailed description of the pattern (has Java code examples, shouldn't be to hard to reproduce it in c#: https://doc.akka.io/docs/akka/current/typed/interaction-patterns.html#general-purpose-response-aggregator )
Alternatively you could look at Akka Streams, that have many of these common operations available to you out of the box
glikoz
@glikoz
thx @ondrejpialek ,I was prepared this before your response. Do you know any sample for Akka Streams for quickstart for my sample? Thx again.
image.png
glikoz
@glikoz
I will follow https://getakka.net/articles/streams/pipeliningandparallelism.html that implemantation, but first I have to implement Streamer (It converts async events that come from external system to AkkaStream)
volgad
@volgad

@glikoz Hi Depending how many messages you received and need to handle, akka stream could make your task much easier in particular to maintain time line / order / consistency of the different events you receive (for multiple in instruments I assume).

Akka stream is also amazing at controling the stream when you replay data and want to go as fast as possible as opposed to live where you actually do not control the flow speed (and most likely will be slower than when you replay data yourself)

Based on my experience, operators on the system that I found very usefull to do the transformations are

new AccumulateWhileUnchanged (from alpakka) to group batch of events and aggregate per time or event order say
throttle to control the flow of your batches of events
scan or scanAsync to transform and update (similar to a fold with every step or transfrom being returned which can be run on arbitrary complex structure as you define it yourself)
map / mapAsync and filter obviously to do some one time transformation

When aggregating multiple streams via fan in Zip or ZipLatest with filter is much better than mergeSorted or mergeSortedN which can be tricky as you need to wait on events on all ports

The Graph API is very usefull and I almost never had to create a custom graphStage

Colin breck website was an amazing ressource for understanding stream / examples

@glikoz I dont know how your feed is but alpakka (in scala) has a lot of ressources to convert an arbitrary source to a stream. I dont know if the equivalent exists in .net
glikoz
@glikoz
Before AkkaStream because of KISS principle, I want to give chance to Akka.Net and TPL, and created gist:https://gist.github.com/glikoz/bd767af231e7356bf97bd65ca76024a7
I'm not sure the correct implementation of wait response from multiple actors (line 21: selection.Ask(1).Wait();) but in this state, akka.net looks 3 times faster than TPL.
glikoz
@glikoz
Altough, Graph Api is the far waited approach to manage workflow, I want to process TickEvents synchronously. https://gist.github.com/glikoz/e6ddaabe1776cdf5d242b94e2be7988d
Sink does not work that I hope, it must collect OrderRequests from all strategies at one Tick Cycle.
@volgad I will make a research about the links that you sent. Thanks.
glikoz
@glikoz
https://getakka.net/articles/intro/tutorial-4.html seems hopeful, but I want to send messages via ActorSelection.
glikoz
@glikoz
So at the and of the day, I just need this code at : https://gist.github.com/glikoz/bd767af231e7356bf97bd65ca76024a7
line 20: int[] result=selection.Ask(1).WaitAndCumulateAllResponses();
WaitAndCumulateAllResponses() ? How?
glikoz
@glikoz
public class MainActor : ReceiveActor
    {
        public MainActor()
        {
            var sw = Stopwatch.StartNew();
            var actorRefList = new List<IActorRef>();
            var taskList = new List<Task<int>>();
            foreach (var item in Enumerable.Range(0, 50))
            {
                var actorRef = Context.ActorOf(Props.Create(() => new Actor()), $"{Guid.NewGuid()}");
                actorRefList.Add(actorRef);
                taskList.Add(actorRef.Ask<int>(1));
            }
            Receive<int[]>(msg => { sw.Stop(); Console.WriteLine(sw.ElapsedMilliseconds); });

            Receive<string>(msg =>
            {
                sw.Restart();
                var xx = Task.WhenAll(taskList.ToArray())
                    .ContinueWith(_ => {
                        sw.Stop();
                        return taskList.Select(p => p.Result).ToArray();
                    } )
                    .PipeTo(Self);
            });  
        }
    }