I just started with Lagom & Akka. I am following the design described in Domain Modeling with Akka Persistence Typed
I am trying to create a brand new instance of an entity (EntityState). But the event is not getting persisted, and I am getting the following error:
00:54:27.862 [error] com.example.impl.entity.EntityClass [persistencePhase=running-cmd, akkaAddress=akka://XXX@127.0.0.1:60685, akkaSource=akka://XXX/system/sharding/StateClass/186/ID1, sourceActorSystem=XXX, persistenceId=StateClass|ID1] - Supervisor StopSupervisor saw failure: null
java.lang.NullPointerException: null
at akka.persistence.typed.javadsl.EventSourcedBehavior.$anonfun$apply$4(EventSourcedBehavior.scala:195)
at akka.persistence.typed.internal.Running$RunningState.applyEvent(Running.scala:78)
at akka.persistence.typed.internal.Running$HandlingCommands.applyEffects(Running.scala:153)
at akka.persistence.typed.internal.Running$HandlingCommands.onCommand(Running.scala:123)
at akka.persistence.typed.internal.Running$HandlingCommands.onMessage(Running.scala:105)
at akka.persistence.typed.internal.Running$HandlingCommands.onMessage(Running.scala:100)
at akka.actor.typed.scaladsl.AbstractBehavior.receive(AbstractBehavior.scala:83)
I have a Create
command, which invokes onCreate()
, and eventually attempts to persist an EntityCreated event.
Service Impl method
@Override
public ServiceCall<CreateMessage, StateView> createState(){
return message ->
entityRef(message.getName())
.<EntityClass.Accepted>ask(replyTo -> new EntityClass.Create(message, replyTo), askTimeout)
.thenApply(accepted -> toStateView(accepted.getSummary()));
}
Command handler:
private ReplyEffect<Event, StateClass> onCreate(StateClass state, Create cmd) {
return Effect()
.persist(new EntityCreated(cmd.getDetails().getName(), Instant.now()))
.thenReply(cmd.replyTo, e -> new Accepted(EntityClass.toSummary(e)));
}
I am able to confirm the following:
First report: It shows that because of a lack of an actor supervisor, when creating lots of sharded PersistentActor some may fail during recovery and the error message is an AskTimeout. Apart from that, there is the throughput problem, which we can observe is 400K a minute at best during peaks, and because of the intermittent lows it averages to about 200K a minute, giving us the mentioned 1 million messages processed in 5 minutes.
https://report-for-patrick-nordwall.s3-us-west-2.amazonaws.com/Report+for+Patrick.html
I can second that tell is not an option, that Kafka will generally consume much faster than the journal can handle, and that you should be sending your Acks from inside of persist block to ensure you're letting the journal write before you actually acknowledge the message.
With regards to the report you posted, we had some bad experiences with mapAsync
with greater than 1 level of parallelism, but this is because of some in-order processing requirements. We get more parallelism on the inputs by adding Kafka partitions.
Additionally, we've been very careful about segregating our dispatchers into more controllable pools, and working very hard to keep things running in the correct dispatcher.
A lot of things will attempt to run in the default dispatcher, which can become overwhelmed and start failing at critical tasks. I would not at all claim to be an expert on configuring dispatchers, but in my experience it was critical to ensuring stability.
I was doing some napkin math on Cassandra throughput when I realized that having 3 nodes in a distributed system in production (either Akka or Cassandra) is a very small and potentially risky number; any single node failure can immediately cause issues building a quorum. Adding more Cassandra pods may help with stability as well as your throughput. That said, I've worked with 3 node systems in production before, so I can understand.
As mentioned above, "too many actors starting at the same time" was one of the issues we also ran into with very large mapAsync
if the messages from kafka were targeting many different Entities. They all attempt recovery from cassandra at the same time and cause additional load. Yes, ^^^ that also may help if it remains a requirement.
I don't know the exact numbers right now, but we are running 10-20 million persistent actors at several thousand combined TPS (we generate more than is input, so I don't have a good handle on some kind of total TPS right now). We're also heavily using the read-side to re-index data into Solr, our messages tend to be (in my opinion) quite large, and we also serve API requests from the same nodes, so our performance is affected by those things. For reference, our Akka cluster size is between 6 and 10 nodes depending on environment, and I believe our Cassandra cluster (with additional keyspaces and load from other applications) is between 9 and 12 nodes, depending on environment. We do occasionally bump up against the limits of writing to cassandra under heavier loads, but avoid losing messages with a very careful Ask pattern.
Another option you may consider is keeping them in memory much longer with a longer passivation interval. For example, we try to model our entities passivation based on real world usage.
In some cases, we simply keep the entities in memory permanently, effectively, using persistence as a backup, and for the read-side only.
Persistence failure when replaying events for persistenceId [con_X1YF9E0AAI8Ae6ba]. Last known sequence number [1]
java.lang.IllegalStateException: Sequence number [2] still missing after [10.00 s], saw unexpected seqNr [3] for persistenceId [con_X1YF9E0AAI8Ae6ba].
while the events table sequence_nrs are correctly going nicely from 1 to 4.akka.persistence.typed.internal.JournalFailureException: Exception during recovery. Last known sequence number [80]. PersistenceId [CustomerEntity|42], due to: Exception during recovery. Last known sequence number [80]. PersistenceId [CustomerEntity|42], due to: Could not resolve type id 'CustomerEntity$Event$ShareAdded' as a subtype of `CustomerEntity$Event$ShareAdded`: known type ids = [AccountCLosed, Credited, Debited]
cleanup-old-persistence-ids
, the application was leaking memory otherwise.Unable to find missing tagged eventTag: .... Missing: .... Previous offset: ....
I see the following with version 1.0.3
Could not load org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal: java.lang.ClassNotFoundException: org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal {}
[info] java.lang.ClassNotFoundException: org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal
[info] at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
[info] at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
[info] at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
[info] at java.base/java.lang.Class.forName0(Native Method)
[info] at java.base/java.lang.Class.forName(Class.java:315)
[info] at com.datastax.oss.driver.internal.core.util.Reflection.loadClass(Reflection.java:55)
[info] at com.datastax.oss.driver.internal.core.util.DependencyCheck.isPresent(DependencyCheck.java:68)
[info] at com.datastax.oss.driver.internal.core.context.DefaultDriverContext.buildRequestProcessorRegistry(DefaultDriverContext.java:533)
[info] at com.datastax.oss.driver.internal.core.util.concurrent.LazyReference.get(LazyReference.java:55)
[info] at com.datastax.oss.driver.internal.core.context.DefaultDriverContext.getRequestProcessorRegistry(DefaultDriverContext.java:863)
[info] at com.datastax.oss.driver.internal.core.session.DefaultSession.<init>(DefaultSession.java:121)
[info] at com.datastax.oss.driver.internal.core.session.DefaultSession.init(DefaultSession.java:88)
[info] at com.datastax.oss.driver.api.core.session.SessionBuilder.buildDefaultSessionAsync(SessionBuilder.java:665)
[info] at com.datastax.oss.driver.api.core.session.SessionBuilder.buildAsync(SessionBuilder.java:598)
[info] at akka.stream.alpakka.cassandra.DefaultSessionProvider.connect(CqlSessionProvider.scala:53)
[info] at akka.stream.alpakka.cassandra.scaladsl.CassandraSession.<init>(CassandraSession.scala:53)
[info] at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.startSession(CassandraSessionRegistry.scala:99)
[info] at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.$anonfun$sessionFor$1(CassandraSessionRegistry.scala:84)
[info] at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
[info] at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.sessionFor(CassandraSessionRegistry.scala:84)
[info] at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.sessionFor(CassandraSessionRegistry.scala:74)
[info] at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.sessionFor(CassandraSessionRegistry.scala:64)
[info] at akka.persistence.cassandra.journal.CassandraJournal.<init>(CassandraJournal.scala:88)
[info] at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
[info] at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
[info] at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
[info] at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
[info] at akka.util.Reflect$.instantiate(Reflect.scala:73)
[info] at akka.actor.ArgsReflectConstructor.produce(IndirectActorProducer.scala:101)
[info] at akka.actor.Props.newActor(Props.scala:226)
[info] at akka.actor.ActorCell.newActor(ActorCell.scala:613)
[info] at akka.actor.ActorCell.create(ActorCell.scala:640)
[info] at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:513)
[info] at akka.actor.ActorCell.systemInvoke(ActorCell.scala:535)
[info] at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:295)
[info] at akka.dispatch.Mailbox.run(Mailbox.scala:230)
[info] at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
[info] at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
[info] at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
...
what can be the cause?
alpakka.cassandra {
# The implementation of `akka.stream.alpakka.cassandra.CqlSessionProvider`
# used for creating the `CqlSession`.
# It may optionally have a constructor with an `ClassicActorSystemProvider` and `Config` parameters.
session-provider = "akka.stream.alpakka.cassandra.DefaultSessionProvider"
# Configure Akka Discovery by setting a service name
service-discovery {
name = ""
lookup-timeout = 1 s
}
# The ExecutionContext to use for the session tasks and future composition.
session-dispatcher = "akka.actor.default-dispatcher"
# Full config path to the Datastax Java driver's configuration section.
# When connecting to more than one Cassandra cluster different session configuration can be
# defined with this property.
# See https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/#quick-overview
# and https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/reference/
datastax-java-driver-config = "datastax-java-driver"
}
Hi akka team, I would like to report that I faced similar issue as stated above with missing events when using cassandra persistence query.
Relevant log: java.lang.RuntimeException: 97124 missing tagged events for tag [TAG]. Failing without search.
My configuration is pretty basic and per reference conf.
events-by-tag {
bucket-size = "Hour"
eventual-consistency-delay = 2s
flush-interval = 25ms
pubsub-notification = on
first-time-bucket = "20201001T00:00"
max-message-batch-size = 20
scanning-flush-interval = 5s
verbose-debug-logging = true
max-missing-to-search = 5000
gap-timeout = 5s
new-persistence-id-scan-timeout = 0s
}
We are deploying on aws keyspaces, atm there are 7-8 persistent actors at the moment.
We are using currentEventsByTag(with Offset) to implement a data polling endpoint.
This issue only happens when offset is set to the very end of the previous hour, i.e. 59th minute and 31+s of previous hour. After clock moves another hour, retrying the same query will not cause this error.
Example: if now is 14:05:00 and I try to get data from 15:59:59, I will get error in logs that 97124 events are missing, but after we "fixed" this it turns out that there were only 59 events for that query, which was confirmed in the database.
There were 2 ways to "fix" this, either setting max-missing-to-search to 100k or 1M, which would be fine I guess if I knew that this number will never grow from 97124 or to set new-persistence-id-scan-timeout = 0s per advice from @pavel.pliachystsik_gitlab .
I choose the latter but I am still not sure that it's a correct fix.
I kindly ask you to provide any feedback as I couldn't find anything about this case in documentation. Thanks.
@pruthvi578
i get the below error when try to configure akka cassandra for persistence[rajupru@ftc-svt-dev-sdap-dev-vm hb_movecervice]$ Uncaught error from thread [Uncaught error from thread [SimplivityHeartbeatInfoSightCluster-cassandra-plugin-default-dispatcher-25]: com/codahale/metrics/JmxReporter, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[SimplivityHeartbeatInfoSightCluster]
java.lang.NoClassDefFoundError: com/codahale/metrics/JmxReporter
at com.datastax.driver.core.Metrics.<init>(Metrics.java:146)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1501)
at com.datastax.driver.core.Cluster.init(Cluster.java:208)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:376)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:355)
at akka.persistence.cassandra.ConfigSessionProvider$$anonfun$connect$1.apply(ConfigSessionProvider.scala:50)
at akka.persistence.cassandra.ConfigSessionProvider$$anonfun$connect$1.apply(ConfigSessionProvider.scala:44)
at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:253)
at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:251)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:92)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:92)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:92)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:49)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Uncaught error from thread [SimplivityHeartbeatInfoSightCluster-cassandra-plugin-default-dispatcher-24]: com/codahale/metrics/JmxReporter, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[SimplivityHeartbeatInfoSightCluster]
java.lang.NoClassDefFoundError: com/codahale/metrics/JmxReporter
Please help how to resolve this?
Have added below the dependencies for akka cassandra build
val akkaCsdraPrst = "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.104"
val dropwizardMetics = "io.dropwizard.metrics" % "metrics-core" % "4.0.0"
val dropwizardMetricsJmx = "io.dropwizard.metrics" % "metrics-jmx" % "4.0.0"
In application conf
`cassandra-journal {
contact-points = ["127.0.0.1:9042"]
authentication.username = "test"
authentication.password = "password"
}
cassandra-snapshot-store {
keyspace = "move-service_test"
contact-points = ["127.0.0.1:9042"]
authentication.username = "test"
authentication.password = "password"
}
datastax-java-driver.advanced.reconnect-on-init = true
datastax-java-driver {
basic.contact-points = ["127.0.0.1:9042"]
}
akka {
persistence {
journal.plugin = "cassandra-journal"
snapshot-store.plugin = "cassandra-snapshot-store"
cassandra {
journal.keyspace = "move_service_test"
journal.authentication.username = "test"
journal.authentication.password = "password"
}
}
}
`