Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Didac
    @umbreak
    Well, since you are using a library you can probably trust that the library is doing what it says it is doing. Otherwise you can check on github on the Akka-persistence-cassandra tests to find out how this is tested
    but I guess simply persisting some events, consuming the stream, then persisting another event and consume the stream. That should work.
    In any case, I’m not part of the Akka team
    anilkumble
    @anilkumble
    @umbrek And one more clarification, so far In PersistentActor I see few api to persist data like persist, persistAll etc. these methods persists data to current persistentId, we are not explicitly passing persistence id to the method it takes it from persistenceId() method.
    Do we have any methods to persist data by passing persistence Id explicitly
    Hope you're getting my point

    Well, since you are using a library you can probably trust that the library is doing what it says it is doing. Otherwise you can check on github on the Akka-persistence-cassandra tests to find out how this is tested

    Okay let me try this :)

    anilkumble
    @anilkumble
    Instead of constant value Is it advisable to change return value of persistenceId() method at runtime ??
    anilkumble
    @anilkumble
    I am using Cleanup class to delete events in Cassandra. Won't it delete the values from tag_scanning table ?
    José Luis Colomer Martorell
    @beikern

    Hi,

    I'm trying to read from two different journals using the CassandraReadJournal plugin and just one ActorSystem. is this possible?

    Here goes my failed attempt:

    val configForJournal1 = ConfigFactory
            .empty()
            .withValue(
              "cassandra-query-journal.write-plugin",
              ConfigValueFactory.fromAnyRef("journal-config-1")
    
    val configForJournal2 = ConfigFactory
            .empty()
            .withValue(
              "cassandra-query-journal.write-plugin",
              ConfigValueFactory.fromAnyRef("journal-config-1")
    
    
    val cassandraReadjornal1 = PersistenceQuery(system)
            .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier, configForJournal1)
    val cassandraReadjornal2 = PersistenceQuery(system)
            .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier, configForJournal2)
    
    
    cassandraReadjornal1.currentEventsByTag("tag", NoOffset).runWith(Sink.foreach(e => println(e)))
    cassandraReadjornal2.currentEventsByTag("tag", NoOffset).runWith(Sink.foreach(e => println(e)))

    The data printed by both streams is the same, meaning that cassandraReadjornal1 and cassandraReadjornal2
    are attached to the same journal.

    Another clue is that I printed cassandraReadjornal1 and cassandraReadjornal2, both have the same hash...

    [info] crj: akka.persistence.cassandra.query.scaladsl.CassandraReadJournal@5f9ccd0c
    [info] crj: akka.persistence.cassandra.query.scaladsl.CassandraReadJournal@5f9ccd0c

    Is there any way to achieve reading from two cassandra journals using the same ActorSystem?

    Greetings.

    José Luis Colomer Martorell
    @beikern

    Now it's working :D

    just call two times to

    val cassandraReadjornal1 = PersistenceQuery(system)
            .readJournalFor[CassandraReadJournal]("sample-cassandra-query-journal")

    Using diferent readJournalPluginConfig.

    anilkumble
    @anilkumble
    Materializer materializer    =   ActorMaterializer.create(getContext().getSystem());
    CassandraReadJournal  journal  =   PersistenceQuery.get(getContext().getSystem()).getReadJournalFor(CassandraReadJournal.class, CassandraReadJournal.Identifier());
    
    Source<EventEnvelope, NotUsed> data = journal.currentEventsByPersistenceId(getSelf().path().name(), 0, Long.MAX_VALUE);
    
     CompletionStage<Done> completionStage = data.runForeach(i -> {
              System.out.println(i.event());
             }, materializer);
    
    completionStage.thenRun(() -> {
                 persist("sample data to persist", p->{
                       System.out.println("data persisted successfully");
                  });
          });

    sample data to persist this data is not getting persisted in database. May I know the reason for this ?

    my use case is to query the persisted data for a specific persistence Id from db and based on the result I want to persist some value .

    Is this possible ?
    Am I doing anything wrong ?

    brabo-hi
    @brabo-hi
    Has anyone run akka persistence cassandra on aws keyspace ?
    from their documentation, they don’t support ’static column’, so i am getting this error in deployment
    com.datastax.driver.core.exceptions.InvalidQueryException: Static column is not yet supported.
    What could be the solution ?
    anilkumble
    @anilkumble
    How to delete records from tag_write_progress, tag_views, tag_scanning, metadata tables ??
    I am using cleanup api to delete. It deletes records only from message table
    brabo-hi
    @brabo-hi
    can we use Akka-persistence-cassandra 1.0.1 in lagom ?
    Christopher Batey
    @chbatey
    @brabo-hi i don't think so as it isn't binary compatible with pre 1.0 versions
    We've recently tested against AWS keyspaces and the latest version works fine
    @anilkumble you can also use the Reconciliation class to delete from the tag related tables
    anilkumble
    @anilkumble
    Okay @chbatey. Will try that
    SynappsGiteau
    @SynappsGiteau
    Hello everyone, we are trying to configure our Cassandra connection to a Cassandra cluster (hosted by Aiven) providing only a CA Certificate (ca.pem). Could someone please tell us how to proceed? Thank you.
    davidlzs
    @davidlzs
    We are trying to use Akka Persistence Cassandra eventsByTag to build readside model, I have a few questions regarding the compatibility between the write (persistent) side and the read side: (1) If the write side and read side have different bucket-size configed, will it cause problem (ex. missing event on read side)? (2) Does the read side and write side must use the same version of akka-persistence-cassandra plugin(lib)? TIA.
    7 replies
    anilkumble
    @anilkumble

    akka.pattern.CircuitBreaker$$anon$1: Circuit Breaker Timed out.

    How to resolve this exception. I am getting this in onPersistFailure call back ?
    Kindly help me in this

    davidlzs
    @davidlzs
    When running Akka Persistence Cassandra eventsByTag, we randomly got error message: "xxxx missing tagged events for tag [tag-3]. Failing without search", could you please shed some light on where I should look at? TIA.
    1 reply
    Simon D
    @imsdu
    Has anyone run performance tests on Cassandra with an akka persistence workload with a lot of eventByTag queries ? What tools were used ? What were the outcomes and the Cassandra tuning that worked best on the cluster side and on the driver side ? I will have to run one soon and it would be nice to have some inputs :)
    I will share my results when I have some
    malrawi
    @malrawi

    I just started with Lagom & Akka. I am following the design described in Domain Modeling with Akka Persistence Typed

    I am trying to create a brand new instance of an entity (EntityState). But the event is not getting persisted, and I am getting the following error:

    00:54:27.862 [error] com.example.impl.entity.EntityClass [persistencePhase=running-cmd, akkaAddress=akka://XXX@127.0.0.1:60685, akkaSource=akka://XXX/system/sharding/StateClass/186/ID1, sourceActorSystem=XXX, persistenceId=StateClass|ID1] - Supervisor StopSupervisor saw failure: null
    java.lang.NullPointerException: null
    at akka.persistence.typed.javadsl.EventSourcedBehavior.$anonfun$apply$4(EventSourcedBehavior.scala:195)
    at akka.persistence.typed.internal.Running$RunningState.applyEvent(Running.scala:78)
    at akka.persistence.typed.internal.Running$HandlingCommands.applyEffects(Running.scala:153)
    at akka.persistence.typed.internal.Running$HandlingCommands.onCommand(Running.scala:123)
    at akka.persistence.typed.internal.Running$HandlingCommands.onMessage(Running.scala:105)
    at akka.persistence.typed.internal.Running$HandlingCommands.onMessage(Running.scala:100)
    at akka.actor.typed.scaladsl.AbstractBehavior.receive(AbstractBehavior.scala:83)

    I have a Create command, which invokes onCreate(), and eventually attempts to persist an EntityCreated event.

    Service Impl method

    @Override
    public ServiceCall<CreateMessage, StateView> createState(){
        return message ->
                entityRef(message.getName())
                        .<EntityClass.Accepted>ask(replyTo -> new EntityClass.Create(message, replyTo), askTimeout)
                        .thenApply(accepted -> toStateView(accepted.getSummary()));
    }

    Command handler:

    private ReplyEffect<Event, StateClass> onCreate(StateClass state, Create cmd) {
        return Effect()
                .persist(new EntityCreated(cmd.getDetails().getName(), Instant.now()))
                .thenReply(cmd.replyTo, e -> new Accepted(EntityClass.toSummary(e)));
    }

    I am able to confirm the following:

    • exception is thrown during persist()
    • the event is not present in Cassandra
      Your help is appreciated. Thank you in advance!
    Miguel Lemos
    @miguelemosreverte
    Hello there!
    Miguel Lemos
    @miguelemosreverte
    We are about to reach Production using Akka! But we have a question to make, and it is on how to achieve max throughput when writing events using Akka Persistence Cassandra. We are consistently achieving spikes of 400K persistences per minute, with lows in between because of how batch writes work. 15 seconds of silence, 15 seconds of 400K writes, and so on. All in all, it can process 1 million messages per 5 minutes. This is using 3 nodes t2.2xlarge in AWS, where we have 3 Cassandra pods. Having 1 or 3 pods of Akka does not make a difference, as the bottleneck does not allow us to process any further. If it were not because of this bottleneck, we would be seeing Akka to the max potential, which we have seen is around 1 million to 1.5 million messages a minute using this infrastructure. But this speed we can only achieve by using tell instead of ask, so that once a message gets deserialized it is send away to an Actor. We lose messages this way, because there is no backpressure: Half of the persistences get thrown away, never realized. Thus, we need to stay with using the Ask pattern, and ensure the bottleneck is dealt with. How would you, community, go on about this challenge? Imagine one message per sharded actor, where each actor must save the event. A million sharded persisted actors. How can we create them faster than 5 minutes?
    @patriknw It is an honor to have you here, I have seen similar questions to the one I just posted here on write throughput about sharded persistent actors. If we manage to solve this problem in our team we could make a solid example for the community, with metrics attached by Grafana. This could be useful for the community, to have a proven example ready to deploy on AWS with Grafana metrics attached. We are using Kafka Localized Processing, from your latests contributions to akka-samples! It is fantastic. Thanks you all for the contributions you make open source.
    Miguel Lemos
    @miguelemosreverte
    Guys, this is a call for help to all the akka-persistence-cassandra community: How do you achieve high throughput when writing say, a million different sharded persistent actors? What have been your experiences with the journal on this, what are the pitfalls. Please. Our team will provide an open source example in return of your help, with Grafana metrics and AWS deployment, and benchmark to prove this journal works well under heavy load, and does not take 5 minutes to process a million unique messages. We are already implementing Localized Processing thanks to your contributions to akka-samples. But we are in the process of deployment to Production, and if we continue to take 5 minutes per million messages, we are going to fail, we are going to have to close down. Again, our specs are 3 AWS t2.2xlarge nodes, 3 Cassandra pods, 3 Akka pods, and one Kafka pod. We are using only 25% percent of both processing and RAM resources.
    I see people commenting on having lost event persistences, we do lose half of them if not applying back pressure. But even then, we do lose 3 out of of 10K when using t2.large nodes, and we lose 3K out of a million when using t2.2xlarge processors. And the cause is the same as the some comments point out. Cassandra tuning. Thus, we have both a low rate throughput (1 million/5 minutes), and we are losing events (3k out of 1 million). We cannot launch production like this. Help is needed, welcome, and will be followed up by us posting open source the entrails of our benchmarked solution for all the next contributors who face the same difficulties.
    Miguel Lemos
    @miguelemosreverte
    I am going to go out on a limb and dare to invoke the community elite by name. I am so sorry for this. We are going to lose a client, maybe go under if this problem goes unsolved. @/all
    Patrik Nordwall
    @patriknw
    @miguelemosreverte Could you explain what you mean by that the events are lost? Is it when writing from EventSourcedBehavior/PersistentActor? What error messages do you see?
    Miguel Lemos
    @miguelemosreverte
    Hello! I will provide a report ASAP
    Miguel Lemos
    @miguelemosreverte

    First report: It shows that because of a lack of an actor supervisor, when creating lots of sharded PersistentActor some may fail during recovery and the error message is an AskTimeout. Apart from that, there is the throughput problem, which we can observe is 400K a minute at best during peaks, and because of the intermittent lows it averages to about 200K a minute, giving us the mentioned 1 million messages processed in 5 minutes.

    https://report-for-patrick-nordwall.s3-us-west-2.amazonaws.com/Report+for+Patrick.html

    Patrik Nordwall
    @patriknw
    Using tell is not an option. You will most certainly be able to read faster from Kafka than what the actors and Cassandra journal can handle. You need the backpressure from using ask. Increasing parallism can help. If order of replies doesn’t matter you can use mapAsynUnordered+ask.
    Scaling out to more pods should help, but if you only see 25% utilization there might be something else, such us K8 cpu limits (throttling) https://erickhun.com/posts/kubernetes-faster-services-no-cpu-limits/
    Patrik Nordwall
    @patriknw
    Supervision will not help for the ask timeouts. That’s probably due to Cassandra timing out, or too many actors starting at the same time. You have to retry the ask. That would not be done by supervision.
    James Nowell
    @jamescnowell

    I can second that tell is not an option, that Kafka will generally consume much faster than the journal can handle, and that you should be sending your Acks from inside of persist block to ensure you're letting the journal write before you actually acknowledge the message.

    With regards to the report you posted, we had some bad experiences with mapAsync with greater than 1 level of parallelism, but this is because of some in-order processing requirements. We get more parallelism on the inputs by adding Kafka partitions.

    Additionally, we've been very careful about segregating our dispatchers into more controllable pools, and working very hard to keep things running in the correct dispatcher.

    A lot of things will attempt to run in the default dispatcher, which can become overwhelmed and start failing at critical tasks. I would not at all claim to be an expert on configuring dispatchers, but in my experience it was critical to ensuring stability.

    Patrik Nordwall
    @patriknw
    If you are starting many actors at the same time it could also be that the limit of 50 concurrent is to low. max-concurrent-recoveries https://github.com/akka/akka/blob/master/akka-persistence/src/main/resources/reference.conf
    But increasing too much will overload C*.
    James Nowell
    @jamescnowell

    I was doing some napkin math on Cassandra throughput when I realized that having 3 nodes in a distributed system in production (either Akka or Cassandra) is a very small and potentially risky number; any single node failure can immediately cause issues building a quorum. Adding more Cassandra pods may help with stability as well as your throughput. That said, I've worked with 3 node systems in production before, so I can understand.

    As mentioned above, "too many actors starting at the same time" was one of the issues we also ran into with very large mapAsync if the messages from kafka were targeting many different Entities. They all attempt recovery from cassandra at the same time and cause additional load. Yes, ^^^ that also may help if it remains a requirement.

    I don't know the exact numbers right now, but we are running 10-20 million persistent actors at several thousand combined TPS (we generate more than is input, so I don't have a good handle on some kind of total TPS right now). We're also heavily using the read-side to re-index data into Solr, our messages tend to be (in my opinion) quite large, and we also serve API requests from the same nodes, so our performance is affected by those things. For reference, our Akka cluster size is between 6 and 10 nodes depending on environment, and I believe our Cassandra cluster (with additional keyspaces and load from other applications) is between 9 and 12 nodes, depending on environment. We do occasionally bump up against the limits of writing to cassandra under heavier loads, but avoid losing messages with a very careful Ask pattern.

    I guess I'd also note that our 6-10 node akka clusters are all 4 core machines with 8gb of ram, and we're intentionally oversized; primarily bound in performance by writing/reading to/from cassandra/solr.
    Miguel Lemos
    @miguelemosreverte
    Lots of feedback. I will proceed to build a basic benchmarking example for this use case. Being the use case "How to start up as many sharded persistent actors as possible".
    James Nowell
    @jamescnowell

    Another option you may consider is keeping them in memory much longer with a longer passivation interval. For example, we try to model our entities passivation based on real world usage.

    In some cases, we simply keep the entities in memory permanently, effectively, using persistence as a backup, and for the read-side only.

    Also if the things I'm saying are completely bad or wrong, please let me know! I wouldn't claim to be an expert, just someone who's already been through the struggle
    Miguel Lemos
    @miguelemosreverte
    During the following days I will be posting updates where your feedback is implemented. The updates will contain the benchmarks. For now, I see that we are not going to use tell, we are going to increase concurrent-recoveries, and add per-actor dispatcher. We already have tried all this tweaks, but now it is going to be documented for your eyes and maybe there will be some 'Aha!' moments where we can find the correct configuration which will, for all to see, leave a lasting example on how much throughput can be expected from an Akka application once it reaches Production.
    Comment about scaling out, the stress benchmark for our current Cassandra setup showed 11K writes per second. Currently we have only reached peaks of 600K per minute, (around 10K writes per second), followed immediately by lows of 300K, so that it averages to 200K per minute. 1 million per 5 minutes. 3k TPS.
    The journey is now to reach a maximum stable throughput against Cassandra.
    Thanks you all already for your valuable feedback, let's make that feedback into an open sourced well documented repository. During the next few days, I will do so. Thanks again.
    Milan van der Meer
    @milanvdm
    Im getting a Persistence failure when replaying events for persistenceId [con_X1YF9E0AAI8Ae6ba]. Last known sequence number [1] java.lang.IllegalStateException: Sequence number [2] still missing after [10.00 s], saw unexpected seqNr [3] for persistenceId [con_X1YF9E0AAI8Ae6ba]. while the events table sequence_nrs are correctly going nicely from 1 to 4.
    I can also decode the blob of the sequence_nr 2, so Im not really sure why it is complaining. Are there any other reason Im missing on why this error could happen?
    Patrik Nordwall
    @patriknw
    Sounds very strange. What version is that?
    Patrik Nordwall
    @patriknw
    @milanvdm I was thinking that it could be ordered by the timestamp timeuuid, but it's ordered by sequence_nr first. Could you anyway check the writer_uuid and timestamp columns of those rows?