Another option you may consider is keeping them in memory much longer with a longer passivation interval. For example, we try to model our entities passivation based on real world usage.
In some cases, we simply keep the entities in memory permanently, effectively, using persistence as a backup, and for the read-side only.
Persistence failure when replaying events for persistenceId [con_X1YF9E0AAI8Ae6ba]. Last known sequence number [1]
java.lang.IllegalStateException: Sequence number [2] still missing after [10.00 s], saw unexpected seqNr [3] for persistenceId [con_X1YF9E0AAI8Ae6ba].
while the events table sequence_nrs are correctly going nicely from 1 to 4.akka.persistence.typed.internal.JournalFailureException: Exception during recovery. Last known sequence number [80]. PersistenceId [CustomerEntity|42], due to: Exception during recovery. Last known sequence number [80]. PersistenceId [CustomerEntity|42], due to: Could not resolve type id 'CustomerEntity$Event$ShareAdded' as a subtype of `CustomerEntity$Event$ShareAdded`: known type ids = [AccountCLosed, Credited, Debited]
cleanup-old-persistence-ids
, the application was leaking memory otherwise.Unable to find missing tagged eventTag: .... Missing: .... Previous offset: ....
I see the following with version 1.0.3
Could not load org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal: java.lang.ClassNotFoundException: org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal {}
[info] java.lang.ClassNotFoundException: org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal
[info] at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
[info] at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
[info] at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
[info] at java.base/java.lang.Class.forName0(Native Method)
[info] at java.base/java.lang.Class.forName(Class.java:315)
[info] at com.datastax.oss.driver.internal.core.util.Reflection.loadClass(Reflection.java:55)
[info] at com.datastax.oss.driver.internal.core.util.DependencyCheck.isPresent(DependencyCheck.java:68)
[info] at com.datastax.oss.driver.internal.core.context.DefaultDriverContext.buildRequestProcessorRegistry(DefaultDriverContext.java:533)
[info] at com.datastax.oss.driver.internal.core.util.concurrent.LazyReference.get(LazyReference.java:55)
[info] at com.datastax.oss.driver.internal.core.context.DefaultDriverContext.getRequestProcessorRegistry(DefaultDriverContext.java:863)
[info] at com.datastax.oss.driver.internal.core.session.DefaultSession.<init>(DefaultSession.java:121)
[info] at com.datastax.oss.driver.internal.core.session.DefaultSession.init(DefaultSession.java:88)
[info] at com.datastax.oss.driver.api.core.session.SessionBuilder.buildDefaultSessionAsync(SessionBuilder.java:665)
[info] at com.datastax.oss.driver.api.core.session.SessionBuilder.buildAsync(SessionBuilder.java:598)
[info] at akka.stream.alpakka.cassandra.DefaultSessionProvider.connect(CqlSessionProvider.scala:53)
[info] at akka.stream.alpakka.cassandra.scaladsl.CassandraSession.<init>(CassandraSession.scala:53)
[info] at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.startSession(CassandraSessionRegistry.scala:99)
[info] at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.$anonfun$sessionFor$1(CassandraSessionRegistry.scala:84)
[info] at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
[info] at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.sessionFor(CassandraSessionRegistry.scala:84)
[info] at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.sessionFor(CassandraSessionRegistry.scala:74)
[info] at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.sessionFor(CassandraSessionRegistry.scala:64)
[info] at akka.persistence.cassandra.journal.CassandraJournal.<init>(CassandraJournal.scala:88)
[info] at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
[info] at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
[info] at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
[info] at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
[info] at akka.util.Reflect$.instantiate(Reflect.scala:73)
[info] at akka.actor.ArgsReflectConstructor.produce(IndirectActorProducer.scala:101)
[info] at akka.actor.Props.newActor(Props.scala:226)
[info] at akka.actor.ActorCell.newActor(ActorCell.scala:613)
[info] at akka.actor.ActorCell.create(ActorCell.scala:640)
[info] at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:513)
[info] at akka.actor.ActorCell.systemInvoke(ActorCell.scala:535)
[info] at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:295)
[info] at akka.dispatch.Mailbox.run(Mailbox.scala:230)
[info] at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
[info] at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
[info] at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
...
what can be the cause?
alpakka.cassandra {
# The implementation of `akka.stream.alpakka.cassandra.CqlSessionProvider`
# used for creating the `CqlSession`.
# It may optionally have a constructor with an `ClassicActorSystemProvider` and `Config` parameters.
session-provider = "akka.stream.alpakka.cassandra.DefaultSessionProvider"
# Configure Akka Discovery by setting a service name
service-discovery {
name = ""
lookup-timeout = 1 s
}
# The ExecutionContext to use for the session tasks and future composition.
session-dispatcher = "akka.actor.default-dispatcher"
# Full config path to the Datastax Java driver's configuration section.
# When connecting to more than one Cassandra cluster different session configuration can be
# defined with this property.
# See https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/#quick-overview
# and https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/reference/
datastax-java-driver-config = "datastax-java-driver"
}
Hi akka team, I would like to report that I faced similar issue as stated above with missing events when using cassandra persistence query.
Relevant log: java.lang.RuntimeException: 97124 missing tagged events for tag [TAG]. Failing without search.
My configuration is pretty basic and per reference conf.
events-by-tag {
bucket-size = "Hour"
eventual-consistency-delay = 2s
flush-interval = 25ms
pubsub-notification = on
first-time-bucket = "20201001T00:00"
max-message-batch-size = 20
scanning-flush-interval = 5s
verbose-debug-logging = true
max-missing-to-search = 5000
gap-timeout = 5s
new-persistence-id-scan-timeout = 0s
}
We are deploying on aws keyspaces, atm there are 7-8 persistent actors at the moment.
We are using currentEventsByTag(with Offset) to implement a data polling endpoint.
This issue only happens when offset is set to the very end of the previous hour, i.e. 59th minute and 31+s of previous hour. After clock moves another hour, retrying the same query will not cause this error.
Example: if now is 14:05:00 and I try to get data from 15:59:59, I will get error in logs that 97124 events are missing, but after we "fixed" this it turns out that there were only 59 events for that query, which was confirmed in the database.
There were 2 ways to "fix" this, either setting max-missing-to-search to 100k or 1M, which would be fine I guess if I knew that this number will never grow from 97124 or to set new-persistence-id-scan-timeout = 0s per advice from @pavel.pliachystsik_gitlab .
I choose the latter but I am still not sure that it's a correct fix.
I kindly ask you to provide any feedback as I couldn't find anything about this case in documentation. Thanks.
@pruthvi578
i get the below error when try to configure akka cassandra for persistence[rajupru@ftc-svt-dev-sdap-dev-vm hb_movecervice]$ Uncaught error from thread [Uncaught error from thread [SimplivityHeartbeatInfoSightCluster-cassandra-plugin-default-dispatcher-25]: com/codahale/metrics/JmxReporter, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[SimplivityHeartbeatInfoSightCluster]
java.lang.NoClassDefFoundError: com/codahale/metrics/JmxReporter
at com.datastax.driver.core.Metrics.<init>(Metrics.java:146)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1501)
at com.datastax.driver.core.Cluster.init(Cluster.java:208)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:376)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:355)
at akka.persistence.cassandra.ConfigSessionProvider$$anonfun$connect$1.apply(ConfigSessionProvider.scala:50)
at akka.persistence.cassandra.ConfigSessionProvider$$anonfun$connect$1.apply(ConfigSessionProvider.scala:44)
at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:253)
at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:251)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:92)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:92)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:92)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:49)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Uncaught error from thread [SimplivityHeartbeatInfoSightCluster-cassandra-plugin-default-dispatcher-24]: com/codahale/metrics/JmxReporter, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[SimplivityHeartbeatInfoSightCluster]
java.lang.NoClassDefFoundError: com/codahale/metrics/JmxReporter
Please help how to resolve this?
Have added below the dependencies for akka cassandra build
val akkaCsdraPrst = "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.104"
val dropwizardMetics = "io.dropwizard.metrics" % "metrics-core" % "4.0.0"
val dropwizardMetricsJmx = "io.dropwizard.metrics" % "metrics-jmx" % "4.0.0"
In application conf
`cassandra-journal {
contact-points = ["127.0.0.1:9042"]
authentication.username = "test"
authentication.password = "password"
}
cassandra-snapshot-store {
keyspace = "move-service_test"
contact-points = ["127.0.0.1:9042"]
authentication.username = "test"
authentication.password = "password"
}
datastax-java-driver.advanced.reconnect-on-init = true
datastax-java-driver {
basic.contact-points = ["127.0.0.1:9042"]
}
akka {
persistence {
journal.plugin = "cassandra-journal"
snapshot-store.plugin = "cassandra-snapshot-store"
cassandra {
journal.keyspace = "move_service_test"
journal.authentication.username = "test"
journal.authentication.password = "password"
}
}
}
`
2020-11-19 06:29:51.714UTC WARN [SimplivityHeartbeatInfoSightCluster-akka.actor.default-dispatcher-20] a.p.c.q.s.CassandraReadJournal - Failed to connect to Cassandra and initialize. It will be retried on demand. Caused by: Authentication error on host /15.112.131.35:9042: Provided username cassandraadmin and/or password are incorrect
2020-11-19 06:29:51.739UTC WARN [SimplivityHeartbeatInfoSightCluster-akka.actor.default-dispatcher-20] a.p.c.j.CassandraJournal - Failed to connect to Cassandra and initialize. It will be retried on demand. Caused by: Authentication error on host /15.112.131.35:9042: Provided username cassandraadmin and/or password are incorrect
2020-11-19 06:29:51.742UTC WARN [SimplivityHeartbeatInfoSightCluster-akka.actor.default-dispatcher-20] a.p.c.s.CassandraSnapshotStore - Failed to connect to Cassandra and initialize. It will be retried on demand. Caused by: Authentication error on host /15.112.131.35:9042: Provided username cassandraadmin and/or password are incorrect
2020-11-19 06:29:51.754UTC ERROR[SimplivityHeartbeatInfoSightCluster-akka.actor.default-dispatcher-18] c.h.s.a.RdaTaskManagerActor - Persistence failure when replaying events for persistenceId [hou4-heartbeat-rda-manager]. Last known sequence number [0]
com.datastax.driver.core.exceptions.AuthenticationException: Authentication error on host /15.112.131.35:9042: Provided username cassandraadmin and/or password are incorrect
eventsByTag(myConstantTag)
?
eventsByTag
will stream all events ordered that have been tagged with a certain name. What if I want to stream all events regardless of the tag?