These are chat archives for akkadotnet/akka.net

11th
Feb 2018
AndreSteenbergen
@AndreSteenbergen
Feb 11 2018 09:47

I can't seem to work out the query part of persistence, can anyone enlighten me?
I am getting

             System.MissingMethodException: 'Method not found: 'Void Akka.Streams.Stage.GraphStageLogic.SetHandler(Akka.Streams.Outlet, System.Action, System.Action)'.'

I am following the exmaples I could find exactly

            var readJournal = PersistenceQuery.Get(system).ReadJournalFor<RedisReadJournal>("akka.persistence.query.journal.redis");
            Source<EventEnvelope, NotUsed> source = readJournal.EventsByPersistenceId("Tenants");

            // materialize stream, consuming events
            var mat = ActorMaterializer.Create(system);
            source.RunForeach(envelope =>
            {
                Console.WriteLine($"event {envelope}");
            }, mat);

this is my config, should all be fine right?

persistence.query {
    journal {
        plugin = ""akka.persistence.query.journal.redis""
        redis {
            class = ""Akka.Persistence.Redis.Query.RedisReadJournalProvider, Akka.Persistence.Redis""
            configuration-string = ""127.0.0.1:6379""
            key-prefix = ""Reservations""
            database = 2
        }
    }
}
AndreSteenbergen
@AndreSteenbergen
Feb 11 2018 10:05
Never mind, cloned the git repo and updated package, error is gone. Nothing is written yet, so need some more debugging
AndreSteenbergen
@AndreSteenbergen
Feb 11 2018 10:41
done, max-buffer-size has no default value, so the plugin never gets any event as default(int) is zero ....
AndreSteenbergen
@AndreSteenbergen
Feb 11 2018 11:36
I have a separate console app doing the heavy lifting with regard to commands/ event/ persisting/ validation etc. I am using remote to feed commands and events into the system. When other systems want to create a Read part of the CQRS system, I need to get the events, so I created an actor to send messages, what do you think?
public class PublishEventsActor : ReceiveActor
{
    private readonly ActorSystem system;

    public PublishEventsActor(ActorSystem system)
    {
        Receive<string>(msg => Handle(msg));
        this.system = system;
    }

    private void Handle(string msg)
    {
        if (msg.Equals("start"))
        {
            var sender = Sender;
            var idsKnown = new HashSet<string>();
            //create a journal query, sending all messages to the actor in the wrapper
            // obtain read journal by plugin id
            var readJournal = PersistenceQuery.Get(system).ReadJournalFor<RedisReadJournal>("akka.persistence.query.journal.redis");
            var mat = ActorMaterializer.Create(Context);

            Source<string, NotUsed> persistenceIds = readJournal.PersistenceIds();
            persistenceIds.RunForeach(s =>
            {
                if (idsKnown.Add(s))
                {
                    Source<EventEnvelope, NotUsed> source = readJournal.EventsByPersistenceId(s);
                    source.RunForeach(envelope =>
                    {
                        sender.Tell(envelope);
                    }, mat);
                }
            }, mat);
        }
    }
}
Marc Piechura
@marcpiechura
Feb 11 2018 11:46
@AndreSteenbergen sounds like a perfect use case for StreamRefs, but you would need to wait a bit until it’s available akkadotnet/akka.net#3321
Marc Piechura
@marcpiechura
Feb 11 2018 11:56
Regarding your code, you could simplify it a bit by using StatefulSelectMany for selecting either Source.Empty or envelop source from backend and calling ConcatMany afterwards
AndreSteenbergen
@AndreSteenbergen
Feb 11 2018 12:55
I will subscribe to that issue number, thx. Regarding your second comment, I don't understand what you mean.
Is there any way to know when the original sender dies or becomes unreachable? It would be useless to send new messages to dead or unreachable actors
Marc Piechura
@marcpiechura
Feb 11 2018 14:22
I‘m on the phone so can’t provide code but you could merge the nested foreach loops into a single stream. Regarding unreachable actors you can use Conext.Watch(otherActorRef) and will get a message once the actor dies
AndreSteenbergen
@AndreSteenbergen
Feb 11 2018 14:28
I would appreciate a pointer on source code how to merge the streams, as this is my first attempt working with streams. My previous attempt works, but the actors were almost like repositories, which I didn't like anymore. This is my first attempt in getting data in a different datastore, without relying on the actors to do the writing themselves.
Thanks for the pointer about Context.Watch
Onur Gumus
@OnurGumus
Feb 11 2018 14:29
@AndreSteenbergen I mean you have tried using the Merge block right ?
AndreSteenbergen
@AndreSteenbergen
Feb 11 2018 14:31
Actually, no, as I am reading through the documentation (http://getakka.net/articles/persistence/persistence-query.html) and it isn't all that helpful in the more advanced stuff
This was the best I could come up with
Onur Gumus
@OnurGumus
Feb 11 2018 14:32
but that article is not about streams
@AndreSteenbergen the key point of understanding streams is you should understand what a materializer really is.
AndreSteenbergen
@AndreSteenbergen
Feb 11 2018 14:33
I know, but it is my entry point in working with the write part of my CQRS system
Onur Gumus
@OnurGumus
Feb 11 2018 14:33
That's the first thing you should really grasp.
I would stick to actors if you are into event sourcing.
They are not very streamish.
Streams are good if you have stream data and your focus is about how this data is routed, you need things like backpressure etc
backpressure is the key point of streams anyway.
if you don't need then I would reconsider my decision on streams
AndreSteenbergen
@AndreSteenbergen
Feb 11 2018 14:37
It wasn't my decision to go this way actually, I came here after seeing PersistentViews were deprecated
Onur Gumus
@OnurGumus
Feb 11 2018 14:37
Yes but there is persistentquery you can use.
just use that one
AndreSteenbergen
@AndreSteenbergen
Feb 11 2018 14:39
I am actually ... I think,
I am following the example on persistence query documentation, it gave me a stream of events.
Onur Gumus
@OnurGumus
Feb 11 2018 14:45
Okay
In this case you can just run them with foureach
You don't need to know much about events.
sorry I mean streams
Why you want to merge things?
AndreSteenbergen
@AndreSteenbergen
Feb 11 2018 14:46
It was suggested by Marc as a simplification of above code snippet
Onur Gumus
@OnurGumus
Feb 11 2018 14:46
Oh I see
@AndreSteenbergen I think you need something like a group by
What merge does is , it merges one or more streams into a single stream, which ever produces a value.
I myself don't see any reason to use merge.
Onur Gumus
@OnurGumus
Feb 11 2018 14:52
Maybe he wanted to tell you can merge the child streams
Can you instead just group by the stream by persistence id?
@AndreSteenbergen But I think your code is also fine.
AndreSteenbergen
@AndreSteenbergen
Feb 11 2018 14:58
Thank I'll leave it for what it is for now. I am still doing basic Actor stuff, remoting and persistence. I liked (and understand ... ;)) the persistant views, as thos views could make state.
but as they will be deprecated, I am hesitant to use them.
Onur Gumus
@OnurGumus
Feb 11 2018 15:18
Speaking of Persistance query, I assume event type mapping supports inheritance right?
event-adapter-bindings {
        "<fully qualified event type name with assembly>" = v1
        "<fully qualified event type name with assembly>" = [v2, tagging]
    }
I don't want to type my 250 events one by one while tagging them
AndreSteenbergen
@AndreSteenbergen
Feb 11 2018 17:04
I am not using tags, I am using a simple switch on the subscriber.
I think c# does the inheritance quite well. Somthing simple like:
 switch (msg.Event)
            {
                case NameChanged nameChanged:
                    Handle(nameChanged, msg.PersistenceId);
                    break;
Onur Gumus
@OnurGumus
Feb 11 2018 17:27
@AndreSteenbergen my point was if you define an event adapter for base class, would it be applicable for derived classes. And the answer is yes
@AndreSteenbergen instead of getting first persistenceids then foreach all of them, you can put a tag to all your messages and you will get all at once
Per tag
AndreSteenbergen
@AndreSteenbergen
Feb 11 2018 17:29
Ah, that's a great idea
Onur Gumus
@OnurGumus
Feb 11 2018 17:29
this is what the resumable projections example do:
AndreSteenbergen
@AndreSteenbergen
Feb 11 2018 17:29
I could create a dummy abstract baseevent, and apply a tag to it
Onur Gumus
@OnurGumus
Feb 11 2018 17:29
or an interface
Interface would be better
Also you can have resumable projections, so if your projection engine crashes, you can continue on restart
this is a must for CQRS read sides
AndreSteenbergen
@AndreSteenbergen
Feb 11 2018 17:31
Yes you are correct.
I am doing the tags first, as it will limit the running tasks as well, every persistenceId creates 1 extra task in my code snippet, by tag fixes that issue
Onur Gumus
@OnurGumus
Feb 11 2018 17:42
you just have to commit the offset and data in a single trx
Stuart Saltzman
@stuartsaltzman
Feb 11 2018 21:09
I am unable to compile the solution on Mac OS X using the build.sh script. I receive the error message below (it is unable to resolve references). Also below is some of build output. I noticed the dockerfile which team city uses is using mono version 5.4.1.7, so I installed that mono version as well and validated that the current version mono version being used. Any thoughts, insight?

** Error Message **
/Users/stuart/Dev/src/dotnet/akka.net/.dotnet/sdk/2.0.0/Microsoft.Common.CurrentVersion.targets(1987,5): warning MSB3245: Could not resolve this reference. Could not locate the assembly "System.IO.Compression.FileSystem". Check to make sure the assembly exists on disk. If this reference is required by your code, you may get compilation errors. [/Users/stuart/Dev/src/dotnet/akka.net/src/core/Akka/Akka.csproj]
CSC : error CS0006: Metadata file '/mscorlib.dll' could not be found [/Users/stuart/Dev/src/dotnet/akka.net/src/core/Akka/Akka.csproj]

** Build Output **
$ ./build.sh build all
Installing .NET CLI...
dotnet-install: Downloading link: https://dotnetcli.azureedge.net/dotnet/Sdk/2.0.0/dotnet-sdk-2.0.0-osx-x64.tar.gz
dotnet-install: Extracting zip from https://dotnetcli.azureedge.net/dotnet/Sdk/2.0.0/dotnet-sdk-2.0.0-osx-x64.tar.gz
dotnet-install: Binaries of dotnet can be found in /Users/stuart/Dev/src/dotnet/akka.net/.dotnet
dotnet-install: Installation finished successfully.
.NET Command Line Tools (2.0.0)

Product Information:
Version: 2.0.0
Commit SHA-1 hash: cdcd1928c9

Runtime Environment:
OS Name: Mac OS X
OS Version: 10.13
OS Platform: Darwin
RID: osx.10.12-x64
Base Path: /Users/stuart/Dev/src/dotnet/akka.net/.dotnet/sdk/2.0.0/

Microsoft .NET Core Shared Framework Host

Version : 2.0.0
Build : e8b8861ac7faf042c87a5c2f9f2d04c98b69f28d

Downloading NuGet...
Feeds used:
/Users/stuart/.nuget/packages/
https://api.nuget.org/v3/index.json


Finished Target: RestorePackages
Starting Target: AssemblyInfo (==> RestorePackages)
Finished Target: AssemblyInfo
Starting Target: Build (==> AssemblyInfo)
dotnet "build" "./src/Akka.sln" --configuration Release
Microsoft (R) Build Engine version 15.3.409.57025 for .NET Core
Copyright (C) Microsoft Corporation. All rights reserved.


/Users/stuart/Dev/src/dotnet/akka.net/.dotnet/sdk/2.0.0/Microsoft.Common.CurrentVersion.targets(1987,5): warning MSB3245: Could not resolve this reference. Could not locate the assembly "System.Configuration". Check to make sure the assembly exists on disk. If this reference is required by your code, you may get compilation errors. [/Users/stuart/Dev/src/dotnet/akka.net/src/core/Akka/Akka.csproj]