Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Miguel Lemos
    @miguelemosreverte
    @patriknw It is an honor to have you here, I have seen similar questions to the one I just posted here on write throughput about sharded persistent actors. If we manage to solve this problem in our team we could make a solid example for the community, with metrics attached by Grafana. This could be useful for the community, to have a proven example ready to deploy on AWS with Grafana metrics attached. We are using Kafka Localized Processing, from your latests contributions to akka-samples! It is fantastic. Thanks you all for the contributions you make open source.
    Miguel Lemos
    @miguelemosreverte
    Guys, this is a call for help to all the akka-persistence-cassandra community: How do you achieve high throughput when writing say, a million different sharded persistent actors? What have been your experiences with the journal on this, what are the pitfalls. Please. Our team will provide an open source example in return of your help, with Grafana metrics and AWS deployment, and benchmark to prove this journal works well under heavy load, and does not take 5 minutes to process a million unique messages. We are already implementing Localized Processing thanks to your contributions to akka-samples. But we are in the process of deployment to Production, and if we continue to take 5 minutes per million messages, we are going to fail, we are going to have to close down. Again, our specs are 3 AWS t2.2xlarge nodes, 3 Cassandra pods, 3 Akka pods, and one Kafka pod. We are using only 25% percent of both processing and RAM resources.
    I see people commenting on having lost event persistences, we do lose half of them if not applying back pressure. But even then, we do lose 3 out of of 10K when using t2.large nodes, and we lose 3K out of a million when using t2.2xlarge processors. And the cause is the same as the some comments point out. Cassandra tuning. Thus, we have both a low rate throughput (1 million/5 minutes), and we are losing events (3k out of 1 million). We cannot launch production like this. Help is needed, welcome, and will be followed up by us posting open source the entrails of our benchmarked solution for all the next contributors who face the same difficulties.
    Miguel Lemos
    @miguelemosreverte
    I am going to go out on a limb and dare to invoke the community elite by name. I am so sorry for this. We are going to lose a client, maybe go under if this problem goes unsolved. @/all
    Patrik Nordwall
    @patriknw
    @miguelemosreverte Could you explain what you mean by that the events are lost? Is it when writing from EventSourcedBehavior/PersistentActor? What error messages do you see?
    Miguel Lemos
    @miguelemosreverte
    Hello! I will provide a report ASAP
    Miguel Lemos
    @miguelemosreverte

    First report: It shows that because of a lack of an actor supervisor, when creating lots of sharded PersistentActor some may fail during recovery and the error message is an AskTimeout. Apart from that, there is the throughput problem, which we can observe is 400K a minute at best during peaks, and because of the intermittent lows it averages to about 200K a minute, giving us the mentioned 1 million messages processed in 5 minutes.

    https://report-for-patrick-nordwall.s3-us-west-2.amazonaws.com/Report+for+Patrick.html

    Patrik Nordwall
    @patriknw
    Using tell is not an option. You will most certainly be able to read faster from Kafka than what the actors and Cassandra journal can handle. You need the backpressure from using ask. Increasing parallism can help. If order of replies doesn’t matter you can use mapAsynUnordered+ask.
    Scaling out to more pods should help, but if you only see 25% utilization there might be something else, such us K8 cpu limits (throttling) https://erickhun.com/posts/kubernetes-faster-services-no-cpu-limits/
    Patrik Nordwall
    @patriknw
    Supervision will not help for the ask timeouts. That’s probably due to Cassandra timing out, or too many actors starting at the same time. You have to retry the ask. That would not be done by supervision.
    James Nowell
    @jamescnowell

    I can second that tell is not an option, that Kafka will generally consume much faster than the journal can handle, and that you should be sending your Acks from inside of persist block to ensure you're letting the journal write before you actually acknowledge the message.

    With regards to the report you posted, we had some bad experiences with mapAsync with greater than 1 level of parallelism, but this is because of some in-order processing requirements. We get more parallelism on the inputs by adding Kafka partitions.

    Additionally, we've been very careful about segregating our dispatchers into more controllable pools, and working very hard to keep things running in the correct dispatcher.

    A lot of things will attempt to run in the default dispatcher, which can become overwhelmed and start failing at critical tasks. I would not at all claim to be an expert on configuring dispatchers, but in my experience it was critical to ensuring stability.

    Patrik Nordwall
    @patriknw
    If you are starting many actors at the same time it could also be that the limit of 50 concurrent is to low. max-concurrent-recoveries https://github.com/akka/akka/blob/master/akka-persistence/src/main/resources/reference.conf
    But increasing too much will overload C*.
    James Nowell
    @jamescnowell

    I was doing some napkin math on Cassandra throughput when I realized that having 3 nodes in a distributed system in production (either Akka or Cassandra) is a very small and potentially risky number; any single node failure can immediately cause issues building a quorum. Adding more Cassandra pods may help with stability as well as your throughput. That said, I've worked with 3 node systems in production before, so I can understand.

    As mentioned above, "too many actors starting at the same time" was one of the issues we also ran into with very large mapAsync if the messages from kafka were targeting many different Entities. They all attempt recovery from cassandra at the same time and cause additional load. Yes, ^^^ that also may help if it remains a requirement.

    I don't know the exact numbers right now, but we are running 10-20 million persistent actors at several thousand combined TPS (we generate more than is input, so I don't have a good handle on some kind of total TPS right now). We're also heavily using the read-side to re-index data into Solr, our messages tend to be (in my opinion) quite large, and we also serve API requests from the same nodes, so our performance is affected by those things. For reference, our Akka cluster size is between 6 and 10 nodes depending on environment, and I believe our Cassandra cluster (with additional keyspaces and load from other applications) is between 9 and 12 nodes, depending on environment. We do occasionally bump up against the limits of writing to cassandra under heavier loads, but avoid losing messages with a very careful Ask pattern.

    I guess I'd also note that our 6-10 node akka clusters are all 4 core machines with 8gb of ram, and we're intentionally oversized; primarily bound in performance by writing/reading to/from cassandra/solr.
    Miguel Lemos
    @miguelemosreverte
    Lots of feedback. I will proceed to build a basic benchmarking example for this use case. Being the use case "How to start up as many sharded persistent actors as possible".
    James Nowell
    @jamescnowell

    Another option you may consider is keeping them in memory much longer with a longer passivation interval. For example, we try to model our entities passivation based on real world usage.

    In some cases, we simply keep the entities in memory permanently, effectively, using persistence as a backup, and for the read-side only.

    Also if the things I'm saying are completely bad or wrong, please let me know! I wouldn't claim to be an expert, just someone who's already been through the struggle
    Miguel Lemos
    @miguelemosreverte
    During the following days I will be posting updates where your feedback is implemented. The updates will contain the benchmarks. For now, I see that we are not going to use tell, we are going to increase concurrent-recoveries, and add per-actor dispatcher. We already have tried all this tweaks, but now it is going to be documented for your eyes and maybe there will be some 'Aha!' moments where we can find the correct configuration which will, for all to see, leave a lasting example on how much throughput can be expected from an Akka application once it reaches Production.
    Comment about scaling out, the stress benchmark for our current Cassandra setup showed 11K writes per second. Currently we have only reached peaks of 600K per minute, (around 10K writes per second), followed immediately by lows of 300K, so that it averages to 200K per minute. 1 million per 5 minutes. 3k TPS.
    The journey is now to reach a maximum stable throughput against Cassandra.
    Thanks you all already for your valuable feedback, let's make that feedback into an open sourced well documented repository. During the next few days, I will do so. Thanks again.
    Milan van der Meer
    @milanvdm
    Im getting a Persistence failure when replaying events for persistenceId [con_X1YF9E0AAI8Ae6ba]. Last known sequence number [1] java.lang.IllegalStateException: Sequence number [2] still missing after [10.00 s], saw unexpected seqNr [3] for persistenceId [con_X1YF9E0AAI8Ae6ba]. while the events table sequence_nrs are correctly going nicely from 1 to 4.
    I can also decode the blob of the sequence_nr 2, so Im not really sure why it is complaining. Are there any other reason Im missing on why this error could happen?
    Patrik Nordwall
    @patriknw
    Sounds very strange. What version is that?
    Patrik Nordwall
    @patriknw
    @milanvdm I was thinking that it could be ordered by the timestamp timeuuid, but it's ordered by sequence_nr first. Could you anyway check the writer_uuid and timestamp columns of those rows?
    brabo-hi
    @brabo-hi

    We've recently tested against AWS keyspaces and the latest version works fine

    are you able to run lagom (akka peristence write side) with AWS keyspace ?

    brabo-hi
    @brabo-hi
    Hi all, i added a new event type to an akka persitent actor. My event are serialized using Jackson. I forgot to add @JsonSubType
    annotation to the newly created event, and now i have my journal corrupted. Here is an example
    akka.persistence.typed.internal.JournalFailureException: Exception during recovery. Last known sequence number [80]. PersistenceId [CustomerEntity|42], due to: Exception during recovery. Last known sequence number [80]. PersistenceId [CustomerEntity|42], due to: Could not resolve type id 'CustomerEntity$Event$ShareAdded' as a subtype of `CustomerEntity$Event$ShareAdded`: known type ids = [AccountCLosed, Credited, Debited]
    how can i fix this ?
    Patrik Nordwall
    @patriknw
    @brabo-hi maybe you can transform the json to what is expected with the JacksonMigration https://doc.akka.io/docs/akka/current/serialization-jackson.html#structural-changes
    brabo-hi
    @brabo-hi
    @patriknw thank you for sharing the link. I think for some entities, i have some event that are corrupted in the journal. How possible is it to delete or fix them
    brabo-hi
    @brabo-hi
    @patriknw is it possible to skip failed event during recovery ?
    Patrik Nordwall
    @patriknw
    @brabo-hi Isn't that exception from deserializing the event? Then you can use the JacksonMigration I linked to. With that you can modify the json tree, changing the invalid json to something that your classes can deserialize. It could even swap that to a dummy event that the actor can ignore.
    brabo-hi
    @brabo-hi
    Thanks @patriknw was helpful
    Pavel Pliachystsik
    @pavel.pliachystsik_gitlab
    Can someone say, why my event's by tag queries show me this error:
    7680 missing tagged events for tag [AUDIT_TAG]. Failing without search, stopping.
    4 replies
    Pavel Pliachystsik
    @pavel.pliachystsik_gitlab
    Can someone help me to fight with missing sharding info in my messages? I'm using cluster sharding in persistence mode, and after 40 days sharding events are disappearing and after restart, my cluster sharding won wake up and will faile of persistence error, where it will say that I have event with id 3, but no 0. In persistence, I'm using fail on error mode, and want to have it if I will have persistence errors in my buisness logic actors. Was trying to add manuall in metadata info bout last events, but they don't work.
    Haemin Yoo
    @yoohaemin
    Hi, we are evaluating an upgrade from 0.95 to 0.102+, so that we can take advantage of cleanup-old-persistence-ids, the application was leaking memory otherwise.
    However, when the cleanup job is run, we observe an IllegalStateException saying Unable to find missing tagged eventTag: .... Missing: .... Previous offset: ....
    Could someone give me pointers to where to look?
    10 replies
    Zhenhao Li
    @Zhen-hao

    I see the following with version 1.0.3

    Could not load org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal: java.lang.ClassNotFoundException: org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal {}
    [info] java.lang.ClassNotFoundException: org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal
    [info]  at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
    [info]  at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
    [info]  at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
    [info]  at java.base/java.lang.Class.forName0(Native Method)
    [info]  at java.base/java.lang.Class.forName(Class.java:315)
    [info]  at com.datastax.oss.driver.internal.core.util.Reflection.loadClass(Reflection.java:55)
    [info]  at com.datastax.oss.driver.internal.core.util.DependencyCheck.isPresent(DependencyCheck.java:68)
    [info]  at com.datastax.oss.driver.internal.core.context.DefaultDriverContext.buildRequestProcessorRegistry(DefaultDriverContext.java:533)
    [info]  at com.datastax.oss.driver.internal.core.util.concurrent.LazyReference.get(LazyReference.java:55)
    [info]  at com.datastax.oss.driver.internal.core.context.DefaultDriverContext.getRequestProcessorRegistry(DefaultDriverContext.java:863)
    [info]  at com.datastax.oss.driver.internal.core.session.DefaultSession.<init>(DefaultSession.java:121)
    [info]  at com.datastax.oss.driver.internal.core.session.DefaultSession.init(DefaultSession.java:88)
    [info]  at com.datastax.oss.driver.api.core.session.SessionBuilder.buildDefaultSessionAsync(SessionBuilder.java:665)
    [info]  at com.datastax.oss.driver.api.core.session.SessionBuilder.buildAsync(SessionBuilder.java:598)
    [info]  at akka.stream.alpakka.cassandra.DefaultSessionProvider.connect(CqlSessionProvider.scala:53)
    [info]  at akka.stream.alpakka.cassandra.scaladsl.CassandraSession.<init>(CassandraSession.scala:53)
    [info]  at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.startSession(CassandraSessionRegistry.scala:99)
    [info]  at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.$anonfun$sessionFor$1(CassandraSessionRegistry.scala:84)
    [info]  at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
    [info]  at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.sessionFor(CassandraSessionRegistry.scala:84)
    [info]  at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.sessionFor(CassandraSessionRegistry.scala:74)
    [info]  at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.sessionFor(CassandraSessionRegistry.scala:64)
    [info]  at akka.persistence.cassandra.journal.CassandraJournal.<init>(CassandraJournal.scala:88)
    [info]  at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    [info]  at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    [info]  at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    [info]  at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
    [info]  at akka.util.Reflect$.instantiate(Reflect.scala:73)
    [info]  at akka.actor.ArgsReflectConstructor.produce(IndirectActorProducer.scala:101)
    [info]  at akka.actor.Props.newActor(Props.scala:226)
    [info]  at akka.actor.ActorCell.newActor(ActorCell.scala:613)
    [info]  at akka.actor.ActorCell.create(ActorCell.scala:640)
    [info]  at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:513)
    [info]  at akka.actor.ActorCell.systemInvoke(ActorCell.scala:535)
    [info]  at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:295)
    [info]  at akka.dispatch.Mailbox.run(Mailbox.scala:230)
    [info]  at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    [info]  at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    [info]  at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
    ...

    what can be the cause?

    Enno
    @ennru
    Alpakka explicitly excludes the Tinkerpop library. What is your session configuration when you get this? https://github.com/akka/alpakka/pull/2201#issuecomment-599454639
    Zhenhao Li
    @Zhen-hao
    I have
    alpakka.cassandra {
      # The implementation of `akka.stream.alpakka.cassandra.CqlSessionProvider`
      # used for creating the `CqlSession`.
      # It may optionally have a constructor with an `ClassicActorSystemProvider` and `Config` parameters.
      session-provider = "akka.stream.alpakka.cassandra.DefaultSessionProvider"
    
      # Configure Akka Discovery by setting a service name
      service-discovery {
        name = ""
        lookup-timeout = 1 s
      }
    
      # The ExecutionContext to use for the session tasks and future composition.
      session-dispatcher = "akka.actor.default-dispatcher"
    
      # Full config path to the Datastax Java driver's configuration section.
      # When connecting to more than one Cassandra cluster different session configuration can be
      # defined with this property.
      # See https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/#quick-overview
      # and https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/reference/
      datastax-java-driver-config = "datastax-java-driver"
    }
    Enno
    @ennru
    The stack trace shows, but it doesn't fail? It looks like the driver just checks if it is available: DependencyCheck.isPresent.
    3 replies
    Dan Di Spaltro
    @dispalt
    I see there are some notes about amazon keyspaces that aren't published and it appears not to work, is that correct? Because it says it doesn't work, then goes into detail about what you should change in the config to make it work
    Haemin Yoo
    @yoohaemin
    Hi, anyone has any ideas on Unable to find missing tagged eventTag: .... Missing: .... Previous offset: .... errors when cleanup-old-persistence-ids are run? I've asked above a few weeks ago but we're still trying to figure this out. Thanks :)
    nolah
    @nolah

    Hi akka team, I would like to report that I faced similar issue as stated above with missing events when using cassandra persistence query.
    Relevant log: java.lang.RuntimeException: 97124 missing tagged events for tag [TAG]. Failing without search.
    My configuration is pretty basic and per reference conf.
    events-by-tag {
    bucket-size = "Hour"
    eventual-consistency-delay = 2s
    flush-interval = 25ms
    pubsub-notification = on
    first-time-bucket = "20201001T00:00"
    max-message-batch-size = 20
    scanning-flush-interval = 5s
    verbose-debug-logging = true
    max-missing-to-search = 5000
    gap-timeout = 5s
    new-persistence-id-scan-timeout = 0s
    }

    We are deploying on aws keyspaces, atm there are 7-8 persistent actors at the moment.
    We are using currentEventsByTag(with Offset) to implement a data polling endpoint.

    This issue only happens when offset is set to the very end of the previous hour, i.e. 59th minute and 31+s of previous hour. After clock moves another hour, retrying the same query will not cause this error.
    Example: if now is 14:05:00 and I try to get data from 15:59:59, I will get error in logs that 97124 events are missing, but after we "fixed" this it turns out that there were only 59 events for that query, which was confirmed in the database.

    There were 2 ways to "fix" this, either setting max-missing-to-search to 100k or 1M, which would be fine I guess if I knew that this number will never grow from 97124 or to set new-persistence-id-scan-timeout = 0s per advice from @pavel.pliachystsik_gitlab .
    I choose the latter but I am still not sure that it's a correct fix.
    I kindly ask you to provide any feedback as I couldn't find anything about this case in documentation. Thanks.

    Dan Di Spaltro
    @dispalt
    @nolah I think you should file a ticket
    pruthvi578
    @pruthvi578

    @pruthvi578
    i get the below error when try to configure akka cassandra for persistence
    [rajupru@ftc-svt-dev-sdap-dev-vm hb_movecervice]$ Uncaught error from thread [Uncaught error from thread [SimplivityHeartbeatInfoSightCluster-cassandra-plugin-default-dispatcher-25]: com/codahale/metrics/JmxReporter, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[SimplivityHeartbeatInfoSightCluster] java.lang.NoClassDefFoundError: com/codahale/metrics/JmxReporter at com.datastax.driver.core.Metrics.<init>(Metrics.java:146) at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1501) at com.datastax.driver.core.Cluster.init(Cluster.java:208) at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:376) at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:355) at akka.persistence.cassandra.ConfigSessionProvider$$anonfun$connect$1.apply(ConfigSessionProvider.scala:50) at akka.persistence.cassandra.ConfigSessionProvider$$anonfun$connect$1.apply(ConfigSessionProvider.scala:44) at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:253) at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:251) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:92) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:92) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:92) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:49) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Uncaught error from thread [SimplivityHeartbeatInfoSightCluster-cassandra-plugin-default-dispatcher-24]: com/codahale/metrics/JmxReporter, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[SimplivityHeartbeatInfoSightCluster] java.lang.NoClassDefFoundError: com/codahale/metrics/JmxReporter
    Please help how to resolve this?
    Have added below the dependencies for akka cassandra build
    val akkaCsdraPrst = "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.104"
    val dropwizardMetics = "io.dropwizard.metrics" % "metrics-core" % "4.0.0"
    val dropwizardMetricsJmx = "io.dropwizard.metrics" % "metrics-jmx" % "4.0.0"

    In application conf
    `cassandra-journal {
    contact-points = ["127.0.0.1:9042"]
    authentication.username = "test"
    authentication.password = "password"
    }

    cassandra-snapshot-store {
    keyspace = "move-service_test"
    contact-points = ["127.0.0.1:9042"]
    authentication.username = "test"
    authentication.password = "password"
    }

    datastax-java-driver.advanced.reconnect-on-init = true
    datastax-java-driver {
    basic.contact-points = ["127.0.0.1:9042"]
    }

    akka {
    persistence {
    journal.plugin = "cassandra-journal"
    snapshot-store.plugin = "cassandra-snapshot-store"

    cassandra {
    journal.keyspace = "move_service_test"
    journal.authentication.username = "test"
    journal.authentication.password = "password"
    }
    }
    }
    `

    1 reply
    Zhenhao Li
    @Zhen-hao
    does the cassandra plugin support "Persist multiple events" as in def persist[Event, State](events: im.Seq[Event]): EffectBuilder[Event, State] ?
    I've found some strange behavior
    Zhenhao Li
    @Zhen-hao
    I have case of Effect.persist(Seq(event1)) that supposes to bring state from s0 to s1. I can see event1 gets persisted and handled. I can see s1 is the output of the event handler from the logs. however, the final state is still s0.
    Zhenhao Li
    @Zhen-hao
    never mind. it is a bug caused by a wrong variable reference. there is always an update event followed by a deletion event. the total effect is no change.
    Zhenhao Li
    @Zhen-hao
    so, as a side effect, I can confirm that persisting multiple events works just fine.