These are chat archives for akkadotnet/akka.net

5th
Sep 2016
riawz
@riawz
Sep 05 2016 00:55

I'm looking at using akka.net streams with BlockingCollection. I was hoping there's more resources on this topic. I'm not sure exactly how it works yet. But I'm doing something like this

Source.From(_c.Stream) .RunForeach(x => self.Tell(x), Context.System.Materializer());

I assume some where would be calling the iterator for me (ie MoveNext) and this would block and akka streams would "figure it out" a way to do the non blocking wait?

The Stream is a wrapper which underneath is just C# BlockingCollection.
riawz
@riawz
Sep 05 2016 01:04

And another question is, if I have some class which needs to listen to delegate events, when this instance is created within an actor, the event listening stops. Is there any best practice to make this work? I'm currently having this instance living inside the console host app which also hosts the akka system. But this would not enable me to be able to restart the actor (and the instance) in case of a failure.

Sorry forgot to mention that I'm quite new to akka. And thanks for any suggestions.

Abdelmawla Mohamed
@abdomohamed
Sep 05 2016 03:21
@Horusiath and @Silv3rcircl3 thanks a lot guys. It's fixed now, it was BsonClassMap. I'm wondering why this wasn't throwing any errors ?
Because I believe recovering journal was throwing unknown discriminator exception but it doesn't happen with Snapshots
Bartosz Sypytkowski
@Horusiath
Sep 05 2016 06:38
@abdomohamed I don't know why there were no errors to be honest, maybe it's good point to set issue for
Bartosz Sypytkowski
@Horusiath
Sep 05 2016 06:45
@riawz streams which takes IEnumerable as source uses MoveNext indeed, but there is no way to make it non-blocking. You cannot change the behavior of the collection itself, you're working with. What you may be interested in, is Source.Queue(bufferSize, overflowStrategy) which uses fully asynchronous queues to work with.
Then your example can be changed to something like:
Bartosz Sypytkowski
@Horusiath
Sep 05 2016 06:52
using (var system = ActorSystem.Create("sys"))
using (var materializer = system.Materializer())
{
    var queue = Source.Queue<int>(bufferSize: 10, overflowStrategy: OverflowStrategy.Backpressure)
        .To(Sink.ActorRef<int>(self, new MyCompleteMessage()))
        .Run(materializer);

    (await queue.OfferAsync(1337)).Match()
        .With<QueueOfferResult.Enqueued>(x =>  /**/)
        .With<QueueOfferResult.Dropped>(x =>  /**/)
        .With<QueueOfferResult.Failure>(x =>  /**/)
        .With<QueueOfferResult.QueueClosed>(x =>  /**/);

    Console.ReadLine();
}
Abdelmawla Mohamed
@abdomohamed
Sep 05 2016 07:02
This message was deleted
riawz
@riawz
Sep 05 2016 07:09

@Horusiath thanks. I'll take a look at Source.Queue. However I'm not sure if I'll be able to use this way because the stream comes from an external lib. I was planning to do it this way. But the underlying blocking behaviour still exists.
`
Task.Run(() => {
if (_c.Stream.MoveNext())
self.Tell(_c.Stream.Current);
else
self.Tell(null);

}).PipeTo(Self);
`

Bartosz Sypytkowski
@Horusiath
Sep 05 2016 07:11
@riawz you cannot make blocking queue non-blocking ;)
riawz
@riawz
Sep 05 2016 07:11
@Horusiath sadly, no
riawz
@riawz
Sep 05 2016 07:16

@Horusiath I was hoping if you could help me out with my other question.

If I have a class which create its own thread etc and listens to events, and I pass this class in as an dependency into an actor, everything works (ie event listening). However this way I would not be able to manage the actor's life cycle (eg restart etc). If I create a new instance of this class inside the actor the event listening will stop working. Is this an akka "feature" or something which suspends the actor hence suspending the class instance hence stops the event listening?

Bartosz Sypytkowski
@Horusiath
Sep 05 2016 07:17
can you show this on the code?
except I'm not suing WebClient
Bartosz Sypytkowski
@Horusiath
Sep 05 2016 07:21
I don't know why are you creating your own thread, but as I said - it's hard to catch up without example
riawz
@riawz
Sep 05 2016 07:30
I meant the external lib creates it, I'm taking it in as a dependency inside the actor
I don't create any thread manually
sorry don't have any example on hand
Bartosz Sypytkowski
@Horusiath
Sep 05 2016 07:37
it's hard to discover the reason without it. I know it should be possible to work with C# event API without problems. Also your use case suggests, that you need to combine akka.streams with kafka, am I right?
riawz
@riawz
Sep 05 2016 07:41

I started with the Task.Run().PipeTo(Self) so that I'm at least putting the blocking nature onto a different thread (this is the only place where I "manually do threading"), then I started looking for alternatives ie Streams. But as you said, can't make blocking non blocking, I think I'm heading to a dead end because the underlying lib uses BlockingCollection.

The eventing is a separate issue which I'm still investigating. I'll report back with new findings.

Marc Piechura
@marcpiechura
Sep 05 2016 08:33
@riawz with Streams 2.4.3 we get UnfoldResource and UnfoldResourceAsync , maybe that's something for you
You can take a look at this PR akkadotnet/akka.net#2292 and search for UnfoldResourceSource
It's for accessing resources for example file IO including open, read and close calls
Konrad `ktoso` Malawski
@ktoso
Sep 05 2016 08:35

hi everyone, we're running a community survey (the JVM Akka) but feedback from the .NET side of things would also be interesting :) Feel free to participate in the survey: http://blog.akka.io/community/2016/09/01/akka-community-survey-2016

thanks in advance :)

DmitriySelischev
@DmitriySelischev
Sep 05 2016 12:27

Hello, guys!
I am trying to test an actor which creates a child inside it. Here is the code:

var self = Self;
_remoteAgent =
Context.ActorOf(
 Props.Create(() => new SISProcessorRemoteAgent(self))
  .WithRouter(FromConfig.Instance), RemoteAgentName);

And it crashes without any exception, but i decided that it was due to FromConfig.Instance.
When i specify new RoundRobinPool(1)(for example), there are all right.
There is my configuration:

actor{
deployment {
"/bulk-process-coordinator/*/remote-agent" {
router = round-robin-pool
nr-of-instances = 1
}
}
}

I specify this configuration in TestKit constructor.
At the real environment it works.
What should i do to solve it? Thank you.

Arjen Smits
@Danthar
Sep 05 2016 12:46
@DmitriySelischev You need to specify it as "user/bulk-process-coordinator/*/remote-agent"
DmitriySelischev
@DmitriySelischev
Sep 05 2016 12:47
ok, i'll try, thanks
Arjen Smits
@Danthar
Sep 05 2016 12:47
user actors are always run under the user guardian actor
thus they start with user/
DmitriySelischev
@DmitriySelischev
Sep 05 2016 12:48
at the real environment it works, but not under testkit
@Danthar unfortunately, using of "user/" also crashes
Arjen Smits
@Danthar
Sep 05 2016 12:52
oh wait. I confused it with group routers
your correct in your initial definition
as to why its not working in the testkit.
not sure.
I normally don't my test actor infrastructure like that.
I test actors in isolation. The fact that they are part of an pool/group is an infrastructure concern as far i see it.
DmitriySelischev
@DmitriySelischev
Sep 05 2016 12:54
i read that another guys use a special RemoteProviders, which replaces with TestProbes at testkit
anyway, thanks for your answer
Arjen Smits
@Danthar
Sep 05 2016 12:58
hold on
so im looking at the routing spec. To see if we do something different there
There are several ways of defining this thing. I only use the code api to define my routers, since I do some exotic stuff in my own projects.
So i had to look up the other methods:
starting from there, and the testcase below that
show some alternative ways to define them.
So you could probably try doing: Context.ActorOf(FromConfig.Instance.Props(Props.Create<BlackHoleActor>()), "router1")
but then with your own actor type
DmitriySelischev
@DmitriySelischev
Sep 05 2016 13:01
ok, i'll try
Arjen Smits
@Danthar
Sep 05 2016 13:02
However i think your initial configuration method should be fine. So it could also be something wierd in the TestKit
Vagif Abilov
@object
Sep 05 2016 13:04
I wonder if there some precautions when sending Ask to routees. I have a consistent hash router that I never have problems with as long as actors are sending to it Tell messages. But sometimes there is a need to query routee, so an actor sends Ask to it. While mostly goes fine, occasionally routee doesn't return within timeout interval (1 minute). So the sender gets restarted and the message from a routee is unhandled. Anything special that we need to be aware when asking actors in the pool?
Bartosz Sypytkowski
@Horusiath
Sep 05 2016 13:14
@object maybe you've got too high request congestion for that routee? after all, every request lands on a queue, if they are coming faster that they can be served, you'll get timeout eventually
Vagif Abilov
@object
Sep 05 2016 13:24
The system is not under very high load.
I will try to extract small example if I manage to reproduce this behavior.
DmitriySelischev
@DmitriySelischev
Sep 05 2016 14:18
@Danthar Thank you for your description, but i've discovered that it's issue with router happens when a path of router actor uses wildcard. I think that it is a problem i have. Maybe testkit cannot work with wildcard paths?
DmitriySelischev
@DmitriySelischev
Sep 05 2016 14:38
@Danthar Yeah! i found why this issue happens! It happens because actor that creates actor with router has no right assigned parent in test
Vagif Abilov
@object
Sep 05 2016 14:41
I've investigated some more my problem with timeout on Ask, and the log looks weird:
16:17:38.970 Debug [akka://Oddjob/user/akamai_uploader/$i#97710076] Querying actor sftp.1 for status
16:17:38.970 Debug [akka://Oddjob/user/sftp.1/$h#731709871] Received QueryStatus in transfer state
16:17:38.970 Debug [akka://Oddjob/user/sftp.1/$h#731709871] Replied to QueryStatus: Processing
16:17:39.984 Warning [akka://Oddjob/user/akamai_uploader/$i#97710076] Timeout while waiting for storage actor status
16:17:40.015 Debug akka://Oddjob/user/akamai_uploader/$i Unhandled message from akka://Oddjob/user/sftp.1/$h :
16:17:40.015 Warning [akka://Oddjob/user/akamai_uploader/$i#97710076] Unhandled message Processing
The timeout is set to 1 sec, no lond on system, and as you can see the routee (sftp.1) replies instantly.
However after 1 second the sender (akamai_uploader) times out while waiting!
What's really strange is that the message then appears in its mailbox as and it logs it as unhandled.
The whole thing looks like the sender is blocked from receiving messages until timeout is raised, and then the message is received but of course gets unhandled.
BTW the code is in F#
Vagif Abilov
@object
Sep 05 2016 14:52
And here's the code:
let (status : SftpCommandTargetStatus) =
logMessage mailbox <| sprintf "Querying actor %s for status on" storageActor.Path.Name
try
storageActor.Ask(QueryStatus, TimeSpan.FromSeconds(1.)) |> Async.RunSynchronously
with _ ->
logWarning mailbox <| sprintf "Timeout while waiting for storage actor status"
SftpCommandTargetStatus.Idle
Vagif Abilov
@object
Sep 05 2016 15:24
BTW in the production code it's F# operator <? that is used, not Ask.
Chris G. Stevens
@cgstevens
Sep 05 2016 16:02
I have had the need to be able to have a ScheduleTellRepeatedly but more like a ScheduleTellDaily which can occurs on any day but at specific time.
I was thinking of taking something similar below and extending the ScheduleTell and wondering how easy or hard it would be.
I think there are several use cases... anyone else have any thoughts?
new TaskSchedule()
        .WithName("DailySchedule Days Sat, Sun, Wed @ 6PM")
        .RepeatDaily()
        .WithStartDate(new DateTime(2016, 9, 3))
        .WithStartTime(new TimeSpan(18, 0, 0))
        .WithDaysOfWeek(new List<DayOfWeek>() { DayOfWeek.Saturday, DayOfWeek.Sunday, DayOfWeek.Wednesday })
        .Message(new GoDoSomeWork())
Vagif Abilov
@object
Sep 05 2016 16:33
@cgstevens But is it a good idea to use this scheduler for such long running tasks? I once talked about this with @Horusiath and he suggested to use something like Quartz.net instead.
Bartosz Sypytkowski
@Horusiath
Sep 05 2016 16:49
@object if I understand your logs correctly it looks like QueryStatus has been received, but response was either not send (maybe exception occurred on responder side) or was not delivered back (which shouldn't occur if both actors are on the same node)
@cgstevens good idea, but it needs support from the persistence module. I've proposed that on akka JVM too some time ago, but since priority was small, nobody really was eager to implement that. Currently you can use Quartz.net integration to achieve similar results.
Vagif Abilov
@object
Sep 05 2016 16:55
@Horusiath correct. And as logs say, the recipient received QueryStatus and replied immediately. But the answer was received by the sender only after it timeout in Ask, so Asked never received anything.
In the example above I may have got wrong Ask await (didn't use ContinueWith and AwaitTask) but as I said in the real production code I am using <? operator.
Chris G. Stevens
@cgstevens
Sep 05 2016 16:58
thank you for the input
Vagif Abilov
@object
Sep 05 2016 16:58
One thing that may or may not be important: when this error happens, the actor who is supposed to respond to Ask may have some active background work (using BeginXXX/EndXXX pattern).
Jacob Appleton
@jacobappleton
Sep 05 2016 17:52
Howdy, is there an equivalent of ReceiveAsync in ReceievePersistentActor? I'm trying to Ask another actor something and then pass that along, but I'm not sure how to do it in ReceivePersistentActor.
I'm very new to Akka.Net, so I might be missing something fundamental...
Bartosz Sypytkowski
@Horusiath
Sep 05 2016 18:36
@jacobappleton fundamental advice is to not use Ask between actors ;) I've written a blog post on that: http://bartoszsypytkowski.com/dont-ask-tell-2/
Vagif Abilov
@object
Sep 05 2016 18:40
I wholeheartedly second this advice!
And the issue I described above made me start revising the whole related workflow so it will only be using Tell. Still I have hard time explaining why this happens.
Daniel D'Agostino
@dandago2_twitter
Sep 05 2016 19:36
I'm wondering, what is the point of DI in Akka .NET? Typically DI is used to swap out implementations (and also to handle automatic injection of dependencies as the system becomes more complex), and by far the most common usage is to substitute real dependencies with mocks. In Akka .NET, you pretty much can't write a (real) unit test for anything that uses the built-in logging, config or messaging (e.g. Tell()) systems. You have to use TestKit. DI feels very much like an afterthought, not to mention the weird and error-prone way it's provided as an extension method; is there any real advantage to it that I'm missing?
Bartosz Sypytkowski
@Horusiath
Sep 05 2016 20:02
@dandago2_twitter DI API will be rebuild somwhere in the future - tbh it seems to have low priority, as number of people loving DI is pretty low here
Daniel D'Agostino
@dandago2_twitter
Sep 05 2016 20:03
yeah I understand it has some problems as well
but honestly it does not look like Akka .NET is designed for DI (most likely a legacy of the original Akka), so I was wondering whethere there is really some real use for it
personally I normally use DI a lot
Bartosz Sypytkowski
@Horusiath
Sep 05 2016 20:05
I think, akka in general was designed to be more oriented towards functional programming (and it's going to be even more in the future)... and represents an attitude of standard functional programmer towards DI
Daniel D'Agostino
@dandago2_twitter
Sep 05 2016 20:21
I see. Makes sense.
qwoz
@qwoz
Sep 05 2016 22:08
I have a system which contains potentially hundreds of thousands of objects that need to be updated on a schedule (some every hour, every day, every week... that kind of thing). This is to do a recalculation of values which are date/time dependent. My current thinking: upon instantiation of the Actor System, create one actor that handles scheduling which then rehydrates all relevant actors (one actor per object) and tells it to update itself. Is there a better approach?