Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
    Christopher Batey
    @brabo-hi i don't think so as it isn't binary compatible with pre 1.0 versions
    We've recently tested against AWS keyspaces and the latest version works fine
    @anilkumble you can also use the Reconciliation class to delete from the tag related tables
    Okay @chbatey. Will try that
    Hello everyone, we are trying to configure our Cassandra connection to a Cassandra cluster (hosted by Aiven) providing only a CA Certificate (ca.pem). Could someone please tell us how to proceed? Thank you.
    We are trying to use Akka Persistence Cassandra eventsByTag to build readside model, I have a few questions regarding the compatibility between the write (persistent) side and the read side: (1) If the write side and read side have different bucket-size configed, will it cause problem (ex. missing event on read side)? (2) Does the read side and write side must use the same version of akka-persistence-cassandra plugin(lib)? TIA.
    7 replies

    akka.pattern.CircuitBreaker$$anon$1: Circuit Breaker Timed out.

    How to resolve this exception. I am getting this in onPersistFailure call back ?
    Kindly help me in this

    When running Akka Persistence Cassandra eventsByTag, we randomly got error message: "xxxx missing tagged events for tag [tag-3]. Failing without search", could you please shed some light on where I should look at? TIA.
    1 reply
    Simon D
    Has anyone run performance tests on Cassandra with an akka persistence workload with a lot of eventByTag queries ? What tools were used ? What were the outcomes and the Cassandra tuning that worked best on the cluster side and on the driver side ? I will have to run one soon and it would be nice to have some inputs :)
    I will share my results when I have some

    I just started with Lagom & Akka. I am following the design described in Domain Modeling with Akka Persistence Typed

    I am trying to create a brand new instance of an entity (EntityState). But the event is not getting persisted, and I am getting the following error:

    00:54:27.862 [error] com.example.impl.entity.EntityClass [persistencePhase=running-cmd, akkaAddress=akka://XXX@, akkaSource=akka://XXX/system/sharding/StateClass/186/ID1, sourceActorSystem=XXX, persistenceId=StateClass|ID1] - Supervisor StopSupervisor saw failure: null
    java.lang.NullPointerException: null
    at akka.persistence.typed.javadsl.EventSourcedBehavior.$anonfun$apply$4(EventSourcedBehavior.scala:195)
    at akka.persistence.typed.internal.Running$RunningState.applyEvent(Running.scala:78)
    at akka.persistence.typed.internal.Running$HandlingCommands.applyEffects(Running.scala:153)
    at akka.persistence.typed.internal.Running$HandlingCommands.onCommand(Running.scala:123)
    at akka.persistence.typed.internal.Running$HandlingCommands.onMessage(Running.scala:105)
    at akka.persistence.typed.internal.Running$HandlingCommands.onMessage(Running.scala:100)
    at akka.actor.typed.scaladsl.AbstractBehavior.receive(AbstractBehavior.scala:83)

    I have a Create command, which invokes onCreate(), and eventually attempts to persist an EntityCreated event.

    Service Impl method

    public ServiceCall<CreateMessage, StateView> createState(){
        return message ->
                        .<EntityClass.Accepted>ask(replyTo -> new EntityClass.Create(message, replyTo), askTimeout)
                        .thenApply(accepted -> toStateView(accepted.getSummary()));

    Command handler:

    private ReplyEffect<Event, StateClass> onCreate(StateClass state, Create cmd) {
        return Effect()
                .persist(new EntityCreated(cmd.getDetails().getName(), Instant.now()))
                .thenReply(cmd.replyTo, e -> new Accepted(EntityClass.toSummary(e)));

    I am able to confirm the following:

    • exception is thrown during persist()
    • the event is not present in Cassandra
      Your help is appreciated. Thank you in advance!
    Miguel Lemos
    Hello there!
    Miguel Lemos
    We are about to reach Production using Akka! But we have a question to make, and it is on how to achieve max throughput when writing events using Akka Persistence Cassandra. We are consistently achieving spikes of 400K persistences per minute, with lows in between because of how batch writes work. 15 seconds of silence, 15 seconds of 400K writes, and so on. All in all, it can process 1 million messages per 5 minutes. This is using 3 nodes t2.2xlarge in AWS, where we have 3 Cassandra pods. Having 1 or 3 pods of Akka does not make a difference, as the bottleneck does not allow us to process any further. If it were not because of this bottleneck, we would be seeing Akka to the max potential, which we have seen is around 1 million to 1.5 million messages a minute using this infrastructure. But this speed we can only achieve by using tell instead of ask, so that once a message gets deserialized it is send away to an Actor. We lose messages this way, because there is no backpressure: Half of the persistences get thrown away, never realized. Thus, we need to stay with using the Ask pattern, and ensure the bottleneck is dealt with. How would you, community, go on about this challenge? Imagine one message per sharded actor, where each actor must save the event. A million sharded persisted actors. How can we create them faster than 5 minutes?
    @patriknw It is an honor to have you here, I have seen similar questions to the one I just posted here on write throughput about sharded persistent actors. If we manage to solve this problem in our team we could make a solid example for the community, with metrics attached by Grafana. This could be useful for the community, to have a proven example ready to deploy on AWS with Grafana metrics attached. We are using Kafka Localized Processing, from your latests contributions to akka-samples! It is fantastic. Thanks you all for the contributions you make open source.
    Miguel Lemos
    Guys, this is a call for help to all the akka-persistence-cassandra community: How do you achieve high throughput when writing say, a million different sharded persistent actors? What have been your experiences with the journal on this, what are the pitfalls. Please. Our team will provide an open source example in return of your help, with Grafana metrics and AWS deployment, and benchmark to prove this journal works well under heavy load, and does not take 5 minutes to process a million unique messages. We are already implementing Localized Processing thanks to your contributions to akka-samples. But we are in the process of deployment to Production, and if we continue to take 5 minutes per million messages, we are going to fail, we are going to have to close down. Again, our specs are 3 AWS t2.2xlarge nodes, 3 Cassandra pods, 3 Akka pods, and one Kafka pod. We are using only 25% percent of both processing and RAM resources.
    I see people commenting on having lost event persistences, we do lose half of them if not applying back pressure. But even then, we do lose 3 out of of 10K when using t2.large nodes, and we lose 3K out of a million when using t2.2xlarge processors. And the cause is the same as the some comments point out. Cassandra tuning. Thus, we have both a low rate throughput (1 million/5 minutes), and we are losing events (3k out of 1 million). We cannot launch production like this. Help is needed, welcome, and will be followed up by us posting open source the entrails of our benchmarked solution for all the next contributors who face the same difficulties.
    Miguel Lemos
    I am going to go out on a limb and dare to invoke the community elite by name. I am so sorry for this. We are going to lose a client, maybe go under if this problem goes unsolved. @/all
    Patrik Nordwall
    @miguelemosreverte Could you explain what you mean by that the events are lost? Is it when writing from EventSourcedBehavior/PersistentActor? What error messages do you see?
    Miguel Lemos
    Hello! I will provide a report ASAP
    Miguel Lemos

    First report: It shows that because of a lack of an actor supervisor, when creating lots of sharded PersistentActor some may fail during recovery and the error message is an AskTimeout. Apart from that, there is the throughput problem, which we can observe is 400K a minute at best during peaks, and because of the intermittent lows it averages to about 200K a minute, giving us the mentioned 1 million messages processed in 5 minutes.


    Patrik Nordwall
    Using tell is not an option. You will most certainly be able to read faster from Kafka than what the actors and Cassandra journal can handle. You need the backpressure from using ask. Increasing parallism can help. If order of replies doesn’t matter you can use mapAsynUnordered+ask.
    Scaling out to more pods should help, but if you only see 25% utilization there might be something else, such us K8 cpu limits (throttling) https://erickhun.com/posts/kubernetes-faster-services-no-cpu-limits/
    Patrik Nordwall
    Supervision will not help for the ask timeouts. That’s probably due to Cassandra timing out, or too many actors starting at the same time. You have to retry the ask. That would not be done by supervision.
    James Nowell

    I can second that tell is not an option, that Kafka will generally consume much faster than the journal can handle, and that you should be sending your Acks from inside of persist block to ensure you're letting the journal write before you actually acknowledge the message.

    With regards to the report you posted, we had some bad experiences with mapAsync with greater than 1 level of parallelism, but this is because of some in-order processing requirements. We get more parallelism on the inputs by adding Kafka partitions.

    Additionally, we've been very careful about segregating our dispatchers into more controllable pools, and working very hard to keep things running in the correct dispatcher.

    A lot of things will attempt to run in the default dispatcher, which can become overwhelmed and start failing at critical tasks. I would not at all claim to be an expert on configuring dispatchers, but in my experience it was critical to ensuring stability.

    Patrik Nordwall
    If you are starting many actors at the same time it could also be that the limit of 50 concurrent is to low. max-concurrent-recoveries https://github.com/akka/akka/blob/master/akka-persistence/src/main/resources/reference.conf
    But increasing too much will overload C*.
    James Nowell

    I was doing some napkin math on Cassandra throughput when I realized that having 3 nodes in a distributed system in production (either Akka or Cassandra) is a very small and potentially risky number; any single node failure can immediately cause issues building a quorum. Adding more Cassandra pods may help with stability as well as your throughput. That said, I've worked with 3 node systems in production before, so I can understand.

    As mentioned above, "too many actors starting at the same time" was one of the issues we also ran into with very large mapAsync if the messages from kafka were targeting many different Entities. They all attempt recovery from cassandra at the same time and cause additional load. Yes, ^^^ that also may help if it remains a requirement.

    I don't know the exact numbers right now, but we are running 10-20 million persistent actors at several thousand combined TPS (we generate more than is input, so I don't have a good handle on some kind of total TPS right now). We're also heavily using the read-side to re-index data into Solr, our messages tend to be (in my opinion) quite large, and we also serve API requests from the same nodes, so our performance is affected by those things. For reference, our Akka cluster size is between 6 and 10 nodes depending on environment, and I believe our Cassandra cluster (with additional keyspaces and load from other applications) is between 9 and 12 nodes, depending on environment. We do occasionally bump up against the limits of writing to cassandra under heavier loads, but avoid losing messages with a very careful Ask pattern.

    I guess I'd also note that our 6-10 node akka clusters are all 4 core machines with 8gb of ram, and we're intentionally oversized; primarily bound in performance by writing/reading to/from cassandra/solr.
    Miguel Lemos
    Lots of feedback. I will proceed to build a basic benchmarking example for this use case. Being the use case "How to start up as many sharded persistent actors as possible".
    James Nowell

    Another option you may consider is keeping them in memory much longer with a longer passivation interval. For example, we try to model our entities passivation based on real world usage.

    In some cases, we simply keep the entities in memory permanently, effectively, using persistence as a backup, and for the read-side only.

    Also if the things I'm saying are completely bad or wrong, please let me know! I wouldn't claim to be an expert, just someone who's already been through the struggle
    Miguel Lemos
    During the following days I will be posting updates where your feedback is implemented. The updates will contain the benchmarks. For now, I see that we are not going to use tell, we are going to increase concurrent-recoveries, and add per-actor dispatcher. We already have tried all this tweaks, but now it is going to be documented for your eyes and maybe there will be some 'Aha!' moments where we can find the correct configuration which will, for all to see, leave a lasting example on how much throughput can be expected from an Akka application once it reaches Production.
    Comment about scaling out, the stress benchmark for our current Cassandra setup showed 11K writes per second. Currently we have only reached peaks of 600K per minute, (around 10K writes per second), followed immediately by lows of 300K, so that it averages to 200K per minute. 1 million per 5 minutes. 3k TPS.
    The journey is now to reach a maximum stable throughput against Cassandra.
    Thanks you all already for your valuable feedback, let's make that feedback into an open sourced well documented repository. During the next few days, I will do so. Thanks again.
    Milan van der Meer
    Im getting a Persistence failure when replaying events for persistenceId [con_X1YF9E0AAI8Ae6ba]. Last known sequence number [1] java.lang.IllegalStateException: Sequence number [2] still missing after [10.00 s], saw unexpected seqNr [3] for persistenceId [con_X1YF9E0AAI8Ae6ba]. while the events table sequence_nrs are correctly going nicely from 1 to 4.
    I can also decode the blob of the sequence_nr 2, so Im not really sure why it is complaining. Are there any other reason Im missing on why this error could happen?
    Patrik Nordwall
    Sounds very strange. What version is that?
    Patrik Nordwall
    @milanvdm I was thinking that it could be ordered by the timestamp timeuuid, but it's ordered by sequence_nr first. Could you anyway check the writer_uuid and timestamp columns of those rows?

    We've recently tested against AWS keyspaces and the latest version works fine

    are you able to run lagom (akka peristence write side) with AWS keyspace ?

    Hi all, i added a new event type to an akka persitent actor. My event are serialized using Jackson. I forgot to add @JsonSubType
    annotation to the newly created event, and now i have my journal corrupted. Here is an example
    akka.persistence.typed.internal.JournalFailureException: Exception during recovery. Last known sequence number [80]. PersistenceId [CustomerEntity|42], due to: Exception during recovery. Last known sequence number [80]. PersistenceId [CustomerEntity|42], due to: Could not resolve type id 'CustomerEntity$Event$ShareAdded' as a subtype of `CustomerEntity$Event$ShareAdded`: known type ids = [AccountCLosed, Credited, Debited]
    how can i fix this ?
    Patrik Nordwall
    @brabo-hi maybe you can transform the json to what is expected with the JacksonMigration https://doc.akka.io/docs/akka/current/serialization-jackson.html#structural-changes
    @patriknw thank you for sharing the link. I think for some entities, i have some event that are corrupted in the journal. How possible is it to delete or fix them
    @patriknw is it possible to skip failed event during recovery ?
    Patrik Nordwall
    @brabo-hi Isn't that exception from deserializing the event? Then you can use the JacksonMigration I linked to. With that you can modify the json tree, changing the invalid json to something that your classes can deserialize. It could even swap that to a dummy event that the actor can ignore.
    Thanks @patriknw was helpful
    Pavel Pliachystsik
    Can someone say, why my event's by tag queries show me this error:
    7680 missing tagged events for tag [AUDIT_TAG]. Failing without search, stopping.
    4 replies
    Pavel Pliachystsik
    Can someone help me to fight with missing sharding info in my messages? I'm using cluster sharding in persistence mode, and after 40 days sharding events are disappearing and after restart, my cluster sharding won wake up and will faile of persistence error, where it will say that I have event with id 3, but no 0. In persistence, I'm using fail on error mode, and want to have it if I will have persistence errors in my buisness logic actors. Was trying to add manuall in metadata info bout last events, but they don't work.
    Haemin Yoo
    Hi, we are evaluating an upgrade from 0.95 to 0.102+, so that we can take advantage of cleanup-old-persistence-ids, the application was leaking memory otherwise.
    However, when the cleanup job is run, we observe an IllegalStateException saying Unable to find missing tagged eventTag: .... Missing: .... Previous offset: ....
    Could someone give me pointers to where to look?
    10 replies
    Zhenhao Li

    I see the following with version 1.0.3

    Could not load org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal: java.lang.ClassNotFoundException: org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal {}
    [info] java.lang.ClassNotFoundException: org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal
    [info]  at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
    [info]  at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
    [info]  at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
    [info]  at java.base/java.lang.Class.forName0(Native Method)
    [info]  at java.base/java.lang.Class.forName(Class.java:315)
    [info]  at com.datastax.oss.driver.internal.core.util.Reflection.loadClass(Reflection.java:55)
    [info]  at com.datastax.oss.driver.internal.core.util.DependencyCheck.isPresent(DependencyCheck.java:68)
    [info]  at com.datastax.oss.driver.internal.core.context.DefaultDriverContext.buildRequestProcessorRegistry(DefaultDriverContext.java:533)
    [info]  at com.datastax.oss.driver.internal.core.util.concurrent.LazyReference.get(LazyReference.java:55)
    [info]  at com.datastax.oss.driver.internal.core.context.DefaultDriverContext.getRequestProcessorRegistry(DefaultDriverContext.java:863)
    [info]  at com.datastax.oss.driver.internal.core.session.DefaultSession.<init>(DefaultSession.java:121)
    [info]  at com.datastax.oss.driver.internal.core.session.DefaultSession.init(DefaultSession.java:88)
    [info]  at com.datastax.oss.driver.api.core.session.SessionBuilder.buildDefaultSessionAsync(SessionBuilder.java:665)
    [info]  at com.datastax.oss.driver.api.core.session.SessionBuilder.buildAsync(SessionBuilder.java:598)
    [info]  at akka.stream.alpakka.cassandra.DefaultSessionProvider.connect(CqlSessionProvider.scala:53)
    [info]  at akka.stream.alpakka.cassandra.scaladsl.CassandraSession.<init>(CassandraSession.scala:53)
    [info]  at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.startSession(CassandraSessionRegistry.scala:99)
    [info]  at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.$anonfun$sessionFor$1(CassandraSessionRegistry.scala:84)
    [info]  at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
    [info]  at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.sessionFor(CassandraSessionRegistry.scala:84)
    [info]  at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.sessionFor(CassandraSessionRegistry.scala:74)
    [info]  at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.sessionFor(CassandraSessionRegistry.scala:64)
    [info]  at akka.persistence.cassandra.journal.CassandraJournal.<init>(CassandraJournal.scala:88)
    [info]  at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    [info]  at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    [info]  at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    [info]  at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
    [info]  at akka.util.Reflect$.instantiate(Reflect.scala:73)
    [info]  at akka.actor.ArgsReflectConstructor.produce(IndirectActorProducer.scala:101)
    [info]  at akka.actor.Props.newActor(Props.scala:226)
    [info]  at akka.actor.ActorCell.newActor(ActorCell.scala:613)
    [info]  at akka.actor.ActorCell.create(ActorCell.scala:640)
    [info]  at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:513)
    [info]  at akka.actor.ActorCell.systemInvoke(ActorCell.scala:535)
    [info]  at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:295)
    [info]  at akka.dispatch.Mailbox.run(Mailbox.scala:230)
    [info]  at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    [info]  at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    [info]  at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)

    what can be the cause?