These are chat archives for akkadotnet/akka.net

26th
Jun 2017
Bartosz Sypytkowski
@Horusiath
Jun 26 2017 04:56
@ncthbrt what problems do you have?
fleed
@fleed
Jun 26 2017 05:50

hi everyone

I'd like to use AkkaPersistence.SqlServer. I've created the actors inheriting from ReceivePersistentActor and I can run the application... I don't receive any error, but the database is always empty. Any suggestion? I'm initializing it this way (.NET core app):

            var config = ConfigurationFactory.ParseString(@"
            akka.persistence{
    journal {
        sql-server {
            # qualified type name of the SQL Server persistence journal actor
            class = ""Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer""

# dispatcher used to drive journal actor
            plugin-dispatcher = ""akka.actor.default-dispatcher""

# connection string used for database access
            connection-string = """ + connectionString + @"""

# default SQL commands timeout
            connection-timeout = 30s

# SQL server schema name to table corresponding with persistent journal
            schema-name = dbo

# SQL server table corresponding with persistent journal
            table-name = EventJournal

# should corresponding journal table be initialized automatically
            auto-initialize = on

# timestamp provider used for generation of journal entries timestamps
            timestamp-provider = ""Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common""

# metadata table
            metadata-table-name = Metadata

        }
    }

    snapshot-store {
    sql-server {

# qualified type name of the SQL Server persistence journal actor
        class = ""Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persistence.SqlServer""

# dispatcher used to drive journal actor
        plugin-dispatcher = ""akka.actor.default-dispatcher""

# connection string used for database access
        connection-string = """ + connectionString + @"""

# default SQL commands timeout
        connection-timeout = 30s

# SQL server schema name to table corresponding with persistent journal
        schema-name = dbo

# SQL server table corresponding with persistent journal
        table-name = SnapshotStore

# should corresponding journal table be initialized automatically
        auto-initialize = on
    }
    }
}
          }");
            return services.AddSingleton<IActorRefFactory>(
                provider =>
                    {
                        var actorSystem = ActorSystem.Create("Update", config);
                        var persistence = SqlServerPersistence.Get(actorSystem);
                        return actorSystem;
                    });
Aaron Stannard
@Aaronontheweb
Jun 26 2017 05:58
@fleed what version are you using?
ah, I see what the issue might be
in order for the journal et al to be created
you have to actually start one of those persistent actors
since the underlying infrastructure is all instantiated lazily
fleed
@fleed
Jun 26 2017 06:19
hi @Aaronontheweb , thank you for answering. I'm using the latest beta package available. Could it be that I need to add the plugin selection (plugin = "akka.persistence.journal.sql-server") as mentioned here https://stackoverflow.com/a/37311122?
fleed
@fleed
Jun 26 2017 06:24
actually it seems that adding that plugin line causes the system to fail
Aaron Stannard
@Aaronontheweb
Jun 26 2017 06:25
means it's loading it now
I suppose
it's 1:30am my time here, so I'm going to turn in shortly - but if you can post your error message on that SO thread one of us can take a look
fleed
@fleed
Jun 26 2017 06:34
thank you @Aaronontheweb , it is ok now
Aaron Stannard
@Aaronontheweb
Jun 26 2017 06:34
glad I could help (in whatever small way I could :p )
fleed
@fleed
Jun 26 2017 06:35
adding the plugin entry initializes the SqlServer persistence, but VisualStudio had automatically changed sql-server to sql - server, breaking everything
another question is: why do I get an immediate snapshot offer?

this is my constructor for the actor:

this.ActorState.ReferenceIndex = referenceIndex;
            this.ActorState.ContentHash = contentHash;
            this.Settings = settings;
            this.Command<RequestExternalContent>(this.OnRequestExternalContent);
            // other commands here
            this.Recover<SnapshotOffer>(
                offer =>
                    {
                        var state = offer.Snapshot as UnitUpdateActorState;
                        if (state == null)
                        {
                            return;
                        }

                        if (!string.IsNullOrEmpty(state.ContentHash))
                        {
                            this.ActorState.ContentHash = state.ContentHash;
                        }

                        this.ActorState.Content = state.Content;
                        if (state.ReferenceIndex > 0)
                        {
                            this.ActorState.ReferenceIndex = state.ReferenceIndex;
                        }

                        this.ActorState.Timestamp = state.Timestamp;
                        this.ActorState.UnitUpdateRequested = state.UnitUpdateRequested;
                        this.ActorState.UpdateState = state.UpdateState;
                        this.Schedule();
                    });

and state is:

public class UnitUpdateActorState
    {
        public int ReferenceIndex { get; set; }

        /// <summary>
        /// Gets or sets the timestamp.
        /// </summary>
        public DateTimeOffset Timestamp { get; set; }

        /// <summary>
        /// Gets or sets the unit update requested.
        /// </summary>
        public UnitUpdateRequested UnitUpdateRequested { get; set; }

        public UpdateContent Content { get; set; }

        public string ContentHash { get; set; }

        public UnitUpdateState UpdateState { get; set; }

        public string GetName()
        {
            return new UnitUpdateActorName(this.UnitUpdateRequested.UpdateId, this.UnitUpdateRequested.UnitId).Build();
        }
    }
fleed
@fleed
Jun 26 2017 06:40
the problem is:
I initialize the constructor with a string for contentHash and an integer higher than 0 for referenceIndex... but immediately after the construction I get a snapshot offer where the ContentHash is ok, but the ReferenceIndex is 0
fleed
@fleed
Jun 26 2017 08:13
and it seems that the snapshot store breaks the system now.. is this configuration correct?
snapshot-store {
            plugin = ""akka.persistence.snapshot-store.sql-server""
            sql-server {

                # qualified type name of the SQL Server persistence journal actor
                class = ""Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persistence.SqlServer""

                # dispatcher used to drive journal actor
                        plugin-dispatcher = ""akka.actor.default-dispatcher""

                # connection string used for database access
                        connection-string = """ + connectionString + @"""

                # default SQL commands timeout
                        connection-timeout = 30s

                # SQL server schema name to table corresponding with persistent journal
                        schema-name = dbo

                # SQL server table corresponding with persistent journal
                        table-name = SnapshotStore

                # should corresponding journal table be initialized automatically
                        auto-initialize = on
            }
        }
Nick Cuthbert
@ncthbrt
Jun 26 2017 09:37
Hey @Horusiath. Simply struggling to understand the connection between custom serializers and persistence. I want to allow version tolerant persistence by using a schema based format such as protocol buffers.
Nick Cuthbert
@ncthbrt
Jun 26 2017 09:55
Does specifying custom serialization-bindings result in the use of custom serializers in the persistence layer?
And if that is the case, can you force custom serialization to apply only when persisting?
Bartosz Sypytkowski
@Horusiath
Jun 26 2017 13:57
@ncthbrt unfortunatelly currently many of persistence backends use the same mappings as remote ones (which I know may be source of pain). A good practice is to make events a separate types. Why do you need to have separate remoting and persistence serializers for the same type?
Nick Cuthbert
@ncthbrt
Jun 26 2017 13:59
Well they don't, it is just that I'm struggling with the typesafety of the Akkling API, due to the way it treats the Persist command
Bartosz Sypytkowski
@Horusiath
Jun 26 2017 14:00
@ncthbrt personally, I think it's more struggle with .NET type safety in general. Lack of union types leaves holes in message-based systems
the only way I see sometimes is to have actor working over obj type anyway, and then type it explicitly after creation using retype function
Nick Cuthbert
@ncthbrt
Jun 26 2017 14:05
I see. Would an alternative be to delegate persistence to a child actor?
Bartosz Sypytkowski
@Horusiath
Jun 26 2017 14:07
from my personal experience, you'll often need to upcast to object anyway (it's the only denominator between actor lifecycle events and custom user messages)
if F# will bring fsharp/fslang-suggestions#538, this issue will be solved
Nick Cuthbert
@ncthbrt
Jun 26 2017 14:25
Fair enough
Thank you
Will give upcasting a bash
Stephen Newman
@goodisontoffee
Jun 26 2017 15:53

Is there some documentation about how to get up and running with the Hyperion solution, using VS2017 but wasn't able to load the solution fully due to the apparent lack of F# support. I have F# as a language option, but it appears it is related to the tooling itself (projects created with CLI not working in VS). New to this issue so am struggling.

Reason I want to take a look? I have a type I am attempting to serialise along the lines of IEnumerable<IContainer<Nullable<DateTime>>> but upon deserialisation I receive a message along these lines:

Swallowing exception during message send
Cause: System.ArgumentException: Object of type 'IContainer1[System.Nullable1[System.DateTime]][]' cannot be converted to type 'System.Collections.Generic.IEnumerable1[System.Nullable1[System.DateTime]]'.
at System.RuntimeType.TryChangeType(Object value, Binder binder, CultureInfo culture, Boolean needsSpecialCast)
at System.RuntimeType.CheckValue(Object value, Binder binder, CultureInfo culture, BindingFlags invokeAttr)
at System.Reflection.MethodBase.CheckArguments(Object[] parameters, Binder binder, BindingFlags invokeAttr, CultureInfo culture, Signature sig)
at System.Reflection.RuntimeMethodInfo.InvokeArgumentsCheck(Object obj, BindingFlags invokeAttr, Binder binder, Object[] parameters, CultureInfo culture)
at System.Reflection.RuntimeMethodInfo.Invoke(Object obj, BindingFlags invokeAttr, Binder binder, Object[] parameters, CultureInfo culture)
at Hyperion.SerializerFactories.ImmutableCollectionsSerializerFactory.<>cDisplayClass5_0.<BuildSerializer>b3(Stream stream, DeserializerSession session)
at Hyperion.ValueSerializers.ObjectSerializer.ReadValue(Stream stream, DeserializerSession session)
at lambda_method(Closure , Stream , DeserializerSession )
at Hyperion.ValueSerializers.ObjectSerializer.ReadValue(Stream stream, DeserializerSession session)
at Hyperion.Serializer.DeserializeT
at Akka.Serialization.HyperionSerializer.FromBinary(Byte[] bytes, Type type)
at Akka.Serialization.Serialization.Deserialize(Byte[] bytes, Int32 serializerId, String manifest)
at Akka.Actor.ActorCell.SerializeAndDeserializePayload(Object obj)
at Akka.Actor.ActorCell.SerializeAndDeserialize(Envelope envelope)
at Akka.Actor.ActorCell.SendMessage(Envelope message)
Timeout (00:00:03) during fishForMessage
Expected: True
But was: False

It seems to lose the knowledge of IContainer<T>, interestingly this problem does happen when dealing with complex types. I was hoping that I would be able to be a good OSS citizen, add an issue, add a failing test, fix the code and raise a PR. but I have stumbled at the first hurdle. Keen to help :)

Akka.Serialization.Hyperion @1.2.1.38-beta
Hyperion @0.9.2
Akka stack @1.2.1
Bartosz Sypytkowski
@Horusiath
Jun 26 2017 16:34
@goodisontoffee problem with F# is general to VS2017 which doesn't support it correctly out of box.
what actually IContainer<T> is in your case?
Stephen Newman
@goodisontoffee
Jun 26 2017 17:14
It's a simple class with two properties - one is an enum, the other is an instance of T so DateTime? in this example
Jesse Connor
@jesseconnr
Jun 26 2017 17:21
What's the difference between using the PipeTo and just using actorC.Tell after the await in the documentation example? - https://gist.github.com/jesseconnr/8b313e825cef4bd838ff2d5e35f05140
Aaron Stannard
@Aaronontheweb
Jun 26 2017 17:22
imho, no real difference in this case @jesseconnr
just depends on whether you want to do the tell inside the task or afterwards I guess :p
semantically the same
Jesse Connor
@jesseconnr
Jun 26 2017 17:34
but even that example, the CreateFeedAsync uses await..
Aaron Stannard
@Aaronontheweb
Jun 26 2017 17:34
the text of that sample might be out of date
wrote most of that before Akka.NET 1.0
await in the task itself
won't hurt you
back then we didn't have full support for await inside the actor's receive method itself
actor message processing is synchronous
by design - the actors themselves are async
they activate and process messages once there's something inside their mailbox
but once they're processing their mailbox contents, that has to be done in FIFO order
so the way an async Receive works since Akka.NET 1.0
Jesse Connor
@jesseconnr
Jun 26 2017 17:37
trying to find the text, but the bootcamp does talk about the await support, but says that it will block the messages when using it
Aaron Stannard
@Aaronontheweb
Jun 26 2017 17:37
is the actor suspends its mailbox
and doesn't process any new messages other than the continuation messages for its current async FSM
yeah, what we don't allow is for interleaving for messages during await operations
because the actor's internal state has to be guaranteed to be thread-safe
as soon as we allow one actor to process multiple messages concurrently
that benefit goes away
and you're back to having to use locks and other fun stuff to synchronize memory
Jesse Connor
@jesseconnr
Jun 26 2017 17:38
so nixing the await and just returning a task and using pipeto would not suspend the inbox right?
Aaron Stannard
@Aaronontheweb
Jun 26 2017 17:39
yep
what you're doing in that case
is having the actor kick off a task
and then that task completes outside the actor
and delivers the result back as a message into the actor's mailbox
so the actor can process other messages that are in its mailbox while that task is running
that's what we're really trying to emphasize there
Jesse Connor
@jesseconnr
Jun 26 2017 17:44
gotcha, the PipeTo example the docs use would still result in the mailbox being suspended though right?
Aaron Stannard
@Aaronontheweb
Jun 26 2017 17:45
if it uses await inside the mailbox
err, the receive methods
if it's just firing off a task then nah
Jesse Connor
@jesseconnr
Jun 26 2017 17:45
it's using await Task.WhenAll(t1,t2)
then returning the result afterwards
which is why it was a bit confusing, it's not really highlighting the purpose of PipeTo
Aaron Stannard
@Aaronontheweb
Jun 26 2017 17:48
can you link me the line in the source where that happens?
doesn't sound right
It's the Ask: Send-And-Receive-Future section.
I'm writing up some tests in linqpad to verify right now.
which gets called inside a Task
and that's fine
won't affect the actor at all
Jesse Connor
@jesseconnr
Jun 26 2017 17:52
Ohh, so the issue is actually using await directly inside of a receive method.
Aaron Stannard
@Aaronontheweb
Jun 26 2017 17:52
yes
the actor determines whether or not to process the next message in its mailbox based on when the Receive method exits
Jesse Connor
@jesseconnr
Jun 26 2017 17:53
Hence the Task.Run so it just returns through the PipeTo, got it, everything just clicked, ha
Aaron Stannard
@Aaronontheweb
Jun 26 2017 17:53
:+1:
it's why we're here
Jesse Connor
@jesseconnr
Jun 26 2017 17:53
thanks
Aaron Stannard
@Aaronontheweb
Jun 26 2017 17:57
no problem
give us a star on Github if you like the framework :p
chipdice
@chipdice
Jun 26 2017 18:00
@Aaronontheweb Do you have any examples of using PipeTo with a DataFlow bufferBlock? I have an ActiveMQ callback that is using BufferBlocks and not sure how to hook it up
Aaron Stannard
@Aaronontheweb
Jun 26 2017 18:01
that's TPL data flow right?
chipdice
@chipdice
Jun 26 2017 18:02
yes
Aaron Stannard
@Aaronontheweb
Jun 26 2017 18:03
oh man, that's a cool idea
hadn't thought of that - when I started my Akka port to .NET I originally built the entire thing on TPL Dataflow
back in the day
haven't revisited it since
chipdice
@chipdice
Jun 26 2017 18:04
It would be very useful in my case. I am currently just using a regular class and having it Tell my ctor with each callback
I guess what I would do is have an actor do something like this...
bufferBlock.ReceiveAsync().PipeTo(Self)
and just do that in a loop
man.... TPL Dataflow would be a great addition to Alpakka
Boban
@bobanco
Jun 26 2017 18:06
@Aaronontheweb , @chipdice it's perfect fit for writing custom graph stage
Boban
@bobanco
Jun 26 2017 18:07
and it shouldn't be hard to write it
Aaron Stannard
@Aaronontheweb
Jun 26 2017 18:07
yeah, right on @bobanco
I was thinking you could even turn it into a publisher
Reactive.Streams.IPublisher
any time the buffer block receives something it can drive the stream
what do you think @Silv3rcircl3 ?
Marc Piechura
@marcpiechura
Jun 26 2017 18:10
Does it support backpressure, i.e. only allow another message once the task or whatever has finished in the buffer block?
Aaron Stannard
@Aaronontheweb
Jun 26 2017 18:11
I think so, if you specify a maximum size
Marc Piechura
@marcpiechura
Jun 26 2017 18:12
Then it would indeed make a perfekt fit, but not sure about the IPublisher, at least for alpakka it would be easier to use a custom graphstage
But if you implement it as IPublisher other library's could use it too, Reactor.net for example
Boban
@bobanco
Jun 26 2017 18:13
it's easier with custom graphstage, @Silv3rcircl3 and yes it does support backpreasure if you make it bounded
Marc Piechura
@marcpiechura
Jun 26 2017 18:13
But it's harder since you need to pass the publisher verification specs from the TCK
yeah, looks like the buffer block can return false here if adding a new message exceeds capacity
well there you go @chipdice - looks like you have some choices :p
Akka.Streams sounds like a good fit
based on what @bobanco and @Silv3rcircl3 have said
chipdice
@chipdice
Jun 26 2017 18:15
That's cool. I haven't used them yet, so I'll take a look at see. Thanks for the info
Marc Piechura
@marcpiechura
Jun 26 2017 18:16
That being said, build it as custom graph stage and use Sink.AsPublisher for other library's :-)
Boban
@bobanco
Jun 26 2017 18:16
@Silv3rcircl3 good idea :+1:
Marc Piechura
@marcpiechura
Jun 26 2017 18:17
Speaking of, @Arkatufus sorry totally forgot your CSV PR, will review it today or tomorrow
chipdice
@chipdice
Jun 26 2017 18:18
I may need to ask you guys for a little more guidance when I find the time to look at this. I'm not real familiar with all this, however I'd like to be more involved
Marc Piechura
@marcpiechura
Jun 26 2017 18:20
Happy to help out with any question 👍
Jesse Connor
@jesseconnr
Jun 26 2017 18:30
So I tried this in linqpad to see if it would wait 10 seconds before processing other messages when using await in the Receive, but it doesn't.
Bartosz Sypytkowski
@Horusiath
Jun 26 2017 18:32
@Aaronontheweb TPL Dataflow doesn't give anything you couldn't achieve with Akka.Streams (it's quite contrary). Speaking of which #2621 proposal adds some usability to current design - i.e. personally I couldn't think of any single person, that would admire current SourceQueue API
Marc Piechura
@marcpiechura
Jun 26 2017 18:33
@jesseconnr afaik you need to use ReceiveAsync for the blocking message
Jesse Connor
@jesseconnr
Jun 26 2017 18:33
Yep, that did it.
So why bother with PipeTo and embedded tasks if await/async doesn't block without specifically choosing ReceiveAsync ?
Bartosz Sypytkowski
@Horusiath
Jun 26 2017 18:38
@jesseconnr ReceiveAsync is a slower and blocks reentrancy (actor processing multiple request flows at the same time)
Jesse Connor
@jesseconnr
Jun 26 2017 18:39
Sure, but it's not required to use when you're doing async/await. I used both with a normal Receive in the test and it didn't block and didn't need PipeTo.
Bartosz Sypytkowski
@Horusiath
Jun 26 2017 18:40
aren't you talking about Receive<T>(Func<T, Task>) here?
Jesse Connor
@jesseconnr
Jun 26 2017 18:40
Receive<BlockingMessage>(async msg => 
{
    var result = await DoAsyncWork(msg.Message);

    MyActorSystem.ActorA.Tell(result);
});
^ I expected that to block, from using await directly in the body, but it didn't.
Bartosz Sypytkowski
@Horusiath
Jun 26 2017 18:42
so in your example, ActorA.Tell is called before DoAsyncWork completes?
Jesse Connor
@jesseconnr
Jun 26 2017 18:43
No, it waits 10 seconds, I'm saying other messages are still processed.
I was expecting the actor to suspend its message processing.
Natan Vivo
@nvivo
Jun 26 2017 19:57
@jesseconnr I think Receive<>(Task) was removed in the latest version, in favor of ReceiveAsync
there is a good chance c# is transforming your async block into an Action like an "async void" instead of Func<Task>
try with ReceiveAsync<BlockingMessage>
Gregorius Soedharmo
@Arkatufus
Jun 26 2017 20:05
@Silv3rcircl3 lol, no problem XD
@Silv3rcircl3 btw, make sure the end of lines for the test files doesn't change, or the test will fail
Jesse Connor
@jesseconnr
Jun 26 2017 20:09
@nvivo ReceiveAsync blocks, but after reading through the release notes I found my mistake. Support for async/await was added in 1.0 with AsyncBehavior.Suspend and AsyncBehavior.Reentrant to let you choose the behavior. The notes still recommend using PipeTo instead as it's more explicit.
Natan Vivo
@nvivo
Jun 26 2017 20:17
1.0 is quite old
not even sure if suspend and reentrant are still there
Aaron Stannard
@Aaronontheweb
Jun 26 2017 20:17
I fixed a lot of that stuff in 1.1
Natan Vivo
@nvivo
Jun 26 2017 20:18
but in general, if you want to block, use ReceiveAsync. If you don't want to block, use Receive with PipeTo
I personally think PipeTo complicate things a little bit, but it's there. If I want the actor to never block, I prefer to delegate the async work to a worker that tells the parent when it's done, and that actor just uses async/await. I think it's cleaner. But, pipeto is there for some cases
note that my opinion diverges from @Aaronontheweb on this =)
Jesse Connor
@jesseconnr
Jun 26 2017 20:20
I had the same qualms at first, but really, pipeto ends up being pretty clean considering you don't have to await anything.
Aaron Stannard
@Aaronontheweb
Jun 26 2017 20:20
<3
Jesse Connor
@jesseconnr
Jun 26 2017 20:20
SomeAsyncMethod().PipeTo(Self)
Natan Vivo
@nvivo
Jun 26 2017 20:21
yeah, there is akka for all tastes =)
Bartosz Sypytkowski
@Horusiath
Jun 26 2017 20:22
@nvivo reentrant/suspended behavior was removed in 1.0.1 ;) first patch after official release ;P
Natan Vivo
@nvivo
Jun 26 2017 20:22
but in the end, it really depends on what you want to do in each case. sometimes what you want is to block. the idea of async/await is to make async looks like sync, so it makes sense
Jesse Connor
@jesseconnr
Jun 26 2017 23:33
Any of you guys know of a good crud example using Akka? The closest I've found was the DSL demo on the use-cases page in the docs.
I feel like my messages are multiplying like rabbits, but I'm really not seeing any other way to do it, it's basically every object = 4 messages minimum.