These are chat archives for akkadotnet/AkkaStreams

22nd
Feb 2017
Paweł Bańka
@pmbanka
Feb 22 2017 13:04

Hi everyone :) I have a few questions related to Persistence Queries.

As I read in the documentation, "Akka Persistence Query (...) can help to migrate data from the write side to the query side database", which is exactly what I want to do. We have a system with some persistent actors storing their events in an SQL Server (using https://github.com/akkadotnet/Akka.Persistence.SqlServer), and I want to move (and afterwords live update) all those events into ElasticSearch. ElasticSearch would then serve as a data store for the read queries.

I noticed that Akka.Persistence.Query.Sql does not expose a stream of all events that ever happened in the system. This addition is mentioned in this issue akkadotnet/akka.net#2321, but I could not find any further discussion on the topic.

Q1: Is this idea suitable/acceptable in general - so that I can create a PR with implementation?

What is available now in the Persistence Query API is a stream of all PersistenceIds and a stream of all events for a particular PersistenceId. Conceptually, they can be combined together using a SelectMany-like operator. If I'm correct, I could use MergeMany transformation for that purpose (http://api.getakka.net/docs/unstable/html/BFD1FE1.htm).

// pseudocode
var readJournal = PersistenceQuery.Get(actorSystem)
    .ReadJournalFor<SqlReadJournal>("akka.persistence.query.my-read-journal");
var ids = readJournal.AllPersistenceIds();
var events = ids.MergeMany(???, id => readJournal.EventsByPersistenceId(id).Select(someSelector));
events.RunForeach(pushToElasticSearch);

A similar idea was described on the Akka mailing list (https://groups.google.com/d/msg/akka-user/l8jbOczKR3k/5eIf4IGnDAAJ), so it seems doable. My concern is that MergeMany accepts a "breadth" parameter, which I don't really understand. Will it allow me to consume only finite number of sources? If so, it seems like it is a no-go.

Q2: What does "breadth" mean in MergeMany?
Q3: Can I construct a stream of all events using only build in streams (AllPersistenceIds and EventsByPersistenceId)?