cleanup-old-persistence-ids
, the application was leaking memory otherwise.Unable to find missing tagged eventTag: .... Missing: .... Previous offset: ....
I see the following with version 1.0.3
Could not load org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal: java.lang.ClassNotFoundException: org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal {}
[info] java.lang.ClassNotFoundException: org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal
[info] at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
[info] at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
[info] at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
[info] at java.base/java.lang.Class.forName0(Native Method)
[info] at java.base/java.lang.Class.forName(Class.java:315)
[info] at com.datastax.oss.driver.internal.core.util.Reflection.loadClass(Reflection.java:55)
[info] at com.datastax.oss.driver.internal.core.util.DependencyCheck.isPresent(DependencyCheck.java:68)
[info] at com.datastax.oss.driver.internal.core.context.DefaultDriverContext.buildRequestProcessorRegistry(DefaultDriverContext.java:533)
[info] at com.datastax.oss.driver.internal.core.util.concurrent.LazyReference.get(LazyReference.java:55)
[info] at com.datastax.oss.driver.internal.core.context.DefaultDriverContext.getRequestProcessorRegistry(DefaultDriverContext.java:863)
[info] at com.datastax.oss.driver.internal.core.session.DefaultSession.<init>(DefaultSession.java:121)
[info] at com.datastax.oss.driver.internal.core.session.DefaultSession.init(DefaultSession.java:88)
[info] at com.datastax.oss.driver.api.core.session.SessionBuilder.buildDefaultSessionAsync(SessionBuilder.java:665)
[info] at com.datastax.oss.driver.api.core.session.SessionBuilder.buildAsync(SessionBuilder.java:598)
[info] at akka.stream.alpakka.cassandra.DefaultSessionProvider.connect(CqlSessionProvider.scala:53)
[info] at akka.stream.alpakka.cassandra.scaladsl.CassandraSession.<init>(CassandraSession.scala:53)
[info] at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.startSession(CassandraSessionRegistry.scala:99)
[info] at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.$anonfun$sessionFor$1(CassandraSessionRegistry.scala:84)
[info] at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
[info] at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.sessionFor(CassandraSessionRegistry.scala:84)
[info] at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.sessionFor(CassandraSessionRegistry.scala:74)
[info] at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.sessionFor(CassandraSessionRegistry.scala:64)
[info] at akka.persistence.cassandra.journal.CassandraJournal.<init>(CassandraJournal.scala:88)
[info] at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
[info] at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
[info] at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
[info] at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
[info] at akka.util.Reflect$.instantiate(Reflect.scala:73)
[info] at akka.actor.ArgsReflectConstructor.produce(IndirectActorProducer.scala:101)
[info] at akka.actor.Props.newActor(Props.scala:226)
[info] at akka.actor.ActorCell.newActor(ActorCell.scala:613)
[info] at akka.actor.ActorCell.create(ActorCell.scala:640)
[info] at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:513)
[info] at akka.actor.ActorCell.systemInvoke(ActorCell.scala:535)
[info] at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:295)
[info] at akka.dispatch.Mailbox.run(Mailbox.scala:230)
[info] at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
[info] at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
[info] at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
...
what can be the cause?
alpakka.cassandra {
# The implementation of `akka.stream.alpakka.cassandra.CqlSessionProvider`
# used for creating the `CqlSession`.
# It may optionally have a constructor with an `ClassicActorSystemProvider` and `Config` parameters.
session-provider = "akka.stream.alpakka.cassandra.DefaultSessionProvider"
# Configure Akka Discovery by setting a service name
service-discovery {
name = ""
lookup-timeout = 1 s
}
# The ExecutionContext to use for the session tasks and future composition.
session-dispatcher = "akka.actor.default-dispatcher"
# Full config path to the Datastax Java driver's configuration section.
# When connecting to more than one Cassandra cluster different session configuration can be
# defined with this property.
# See https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/#quick-overview
# and https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/reference/
datastax-java-driver-config = "datastax-java-driver"
}
Hi akka team, I would like to report that I faced similar issue as stated above with missing events when using cassandra persistence query.
Relevant log: java.lang.RuntimeException: 97124 missing tagged events for tag [TAG]. Failing without search.
My configuration is pretty basic and per reference conf.
events-by-tag {
bucket-size = "Hour"
eventual-consistency-delay = 2s
flush-interval = 25ms
pubsub-notification = on
first-time-bucket = "20201001T00:00"
max-message-batch-size = 20
scanning-flush-interval = 5s
verbose-debug-logging = true
max-missing-to-search = 5000
gap-timeout = 5s
new-persistence-id-scan-timeout = 0s
}
We are deploying on aws keyspaces, atm there are 7-8 persistent actors at the moment.
We are using currentEventsByTag(with Offset) to implement a data polling endpoint.
This issue only happens when offset is set to the very end of the previous hour, i.e. 59th minute and 31+s of previous hour. After clock moves another hour, retrying the same query will not cause this error.
Example: if now is 14:05:00 and I try to get data from 15:59:59, I will get error in logs that 97124 events are missing, but after we "fixed" this it turns out that there were only 59 events for that query, which was confirmed in the database.
There were 2 ways to "fix" this, either setting max-missing-to-search to 100k or 1M, which would be fine I guess if I knew that this number will never grow from 97124 or to set new-persistence-id-scan-timeout = 0s per advice from @pavel.pliachystsik_gitlab .
I choose the latter but I am still not sure that it's a correct fix.
I kindly ask you to provide any feedback as I couldn't find anything about this case in documentation. Thanks.
@pruthvi578
i get the below error when try to configure akka cassandra for persistence[rajupru@ftc-svt-dev-sdap-dev-vm hb_movecervice]$ Uncaught error from thread [Uncaught error from thread [SimplivityHeartbeatInfoSightCluster-cassandra-plugin-default-dispatcher-25]: com/codahale/metrics/JmxReporter, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[SimplivityHeartbeatInfoSightCluster]
java.lang.NoClassDefFoundError: com/codahale/metrics/JmxReporter
at com.datastax.driver.core.Metrics.<init>(Metrics.java:146)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1501)
at com.datastax.driver.core.Cluster.init(Cluster.java:208)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:376)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:355)
at akka.persistence.cassandra.ConfigSessionProvider$$anonfun$connect$1.apply(ConfigSessionProvider.scala:50)
at akka.persistence.cassandra.ConfigSessionProvider$$anonfun$connect$1.apply(ConfigSessionProvider.scala:44)
at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:253)
at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:251)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:92)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:92)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:92)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:49)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Uncaught error from thread [SimplivityHeartbeatInfoSightCluster-cassandra-plugin-default-dispatcher-24]: com/codahale/metrics/JmxReporter, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[SimplivityHeartbeatInfoSightCluster]
java.lang.NoClassDefFoundError: com/codahale/metrics/JmxReporter
Please help how to resolve this?
Have added below the dependencies for akka cassandra build
val akkaCsdraPrst = "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.104"
val dropwizardMetics = "io.dropwizard.metrics" % "metrics-core" % "4.0.0"
val dropwizardMetricsJmx = "io.dropwizard.metrics" % "metrics-jmx" % "4.0.0"
In application conf
`cassandra-journal {
contact-points = ["127.0.0.1:9042"]
authentication.username = "test"
authentication.password = "password"
}
cassandra-snapshot-store {
keyspace = "move-service_test"
contact-points = ["127.0.0.1:9042"]
authentication.username = "test"
authentication.password = "password"
}
datastax-java-driver.advanced.reconnect-on-init = true
datastax-java-driver {
basic.contact-points = ["127.0.0.1:9042"]
}
akka {
persistence {
journal.plugin = "cassandra-journal"
snapshot-store.plugin = "cassandra-snapshot-store"
cassandra {
journal.keyspace = "move_service_test"
journal.authentication.username = "test"
journal.authentication.password = "password"
}
}
}
`
2020-11-19 06:29:51.714UTC WARN [SimplivityHeartbeatInfoSightCluster-akka.actor.default-dispatcher-20] a.p.c.q.s.CassandraReadJournal - Failed to connect to Cassandra and initialize. It will be retried on demand. Caused by: Authentication error on host /15.112.131.35:9042: Provided username cassandraadmin and/or password are incorrect
2020-11-19 06:29:51.739UTC WARN [SimplivityHeartbeatInfoSightCluster-akka.actor.default-dispatcher-20] a.p.c.j.CassandraJournal - Failed to connect to Cassandra and initialize. It will be retried on demand. Caused by: Authentication error on host /15.112.131.35:9042: Provided username cassandraadmin and/or password are incorrect
2020-11-19 06:29:51.742UTC WARN [SimplivityHeartbeatInfoSightCluster-akka.actor.default-dispatcher-20] a.p.c.s.CassandraSnapshotStore - Failed to connect to Cassandra and initialize. It will be retried on demand. Caused by: Authentication error on host /15.112.131.35:9042: Provided username cassandraadmin and/or password are incorrect
2020-11-19 06:29:51.754UTC ERROR[SimplivityHeartbeatInfoSightCluster-akka.actor.default-dispatcher-18] c.h.s.a.RdaTaskManagerActor - Persistence failure when replaying events for persistenceId [hou4-heartbeat-rda-manager]. Last known sequence number [0]
com.datastax.driver.core.exceptions.AuthenticationException: Authentication error on host /15.112.131.35:9042: Provided username cassandraadmin and/or password are incorrect
eventsByTag(myConstantTag)
?
eventsByTag
will stream all events ordered that have been tagged with a certain name. What if I want to stream all events regardless of the tag?
akka.loglevel = OFF
and akka.stdout-loglevel = OFF
in my application.conf, it worked for my application logs, but still output DEBUG logs like below.23:56:41.709 [Blogs-akka.persistence.cassandra.default-dispatcher-7] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
23:56:41.762 [Blogs-akka.persistence.cassandra.default-dispatcher-7] DEBUG com.datastax.oss.driver.internal.core.util.Reflection - Could not load com.esri.core.geometry.ogc.OGCGeometry: java.lang.ClassNotFoundException: com.esri.core.geometry.ogc.OGCGeometry
java.lang.ClassNotFoundException: com.esri.core.geometry.ogc.OGCGeometry
at java.lang.Class.forNameImpl(Native Method)
at java.lang.Class.forName(Class.java:345)
at com.datastax.oss.driver.internal.core.util.Reflection.loadClass(Reflection.java:55)
at com.datastax.oss.driver.internal.core.util.DependencyCheck.isPresent(DependencyCheck.java:68)
at com.datastax.oss.driver.internal.core.context.DefaultDriverContext.buildCodecRegistry(DefaultDriverContext.java:570)
at com.datastax.oss.driver.internal.core.context.DefaultDriverContext.<init>(DefaultDriverContext.java:255)
at com.datastax.oss.driver.api.core.session.SessionBuilder.buildContext(SessionBuilder.java:721)
at com.datastax.oss.driver.api.core.session.SessionBuilder.buildDefaultSessionAsync(SessionBuilder.java:666)
at com.datastax.oss.driver.api.core.session.SessionBuilder.buildAsync(SessionBuilder.java:598)
at akka.stream.alpakka.cassandra.DefaultSessionProvider.connect(CqlSessionProvider.scala:53)
at akka.stream.alpakka.cassandra.scaladsl.CassandraSession.<init>(CassandraSession.scala:53)
at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.startSession(CassandraSessionRegistry.scala:99)
at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.$anonfun$sessionFor$1(CassandraSessionRegistry.scala:84)
at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry$$Lambda$39711/0x0000000014690220.apply(Unknown Source)
at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.sessionFor(CassandraSessionRegistry.scala:84)
at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.sessionFor(CassandraSessionRegistry.scala:74)
at akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry.sessionFor(CassandraSessionRegistry.scala:64)
at akka.persistence.cassandra.snapshot.CassandraSnapshotStore.<init>(CassandraSnapshotStore.scala:64)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at akka.util.Reflect$.instantiate(Reflect.scala:73)
at akka.actor.ArgsReflectConstructor.produce(IndirectActorProducer.scala:101)
at akka.actor.Props.newActor(Props.scala:226)
at akka.actor.ActorCell.newActor(ActorCell.scala:613)
at akka.actor.ActorCell.create(ActorCell.scala:640)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:513)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:535)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:295)
at akka.dispatch.Mailbox.run(Mailbox.scala:230)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
23:56:41.763 [Blogs-akka.persistence.cassandra.default-dispatcher-7] INFO com.datastax.oss.driver.internal.core.context.InternalDriverContext - Could not register Geo codecs; this is normal if ESRI was explicit
Maybe someone here can help debugging an issue we've been seeing on v0.102
We're using eventsByTag with a partitioned set of tags (baseTag-{shardId}) to create some projections on the read side.
Under light or medium load, we're able to "keep up" quite successfully (we measure the TimeBasedUUID we're currently writing vs. the current time).
When we increase the load, we see our delay "sawtoothing". For some amount of time, a given tag may keep up, but eventually slows down and gets further and further behind.
We have an internal check to detect the lag, and restart the stream if the lag is above a certain threshold. After the restart, the stream is very easily able to catch up almost instantly, but eventually lags behind again.
Most of our effort so far has been focused on the Sink side of the stream, assuming we're somehow writing data slower than we can read it. After extensive testing, I'm not longer sure this is the case. We've added relatively large buffers to give us the best chance of writing larger, high throughput batches to our projected views, but those batches are nowhere near full.
It seems as if the eventsByTag simply isn't pulling enough events for some reason, or otherwise the events are not being written to tag_views
in a timely fashion. It seems more likely to be query related, not write related since we're able to catch up almost instantly.
Any ideas where to look to understand more about why eventsByTag is slowing down?
cassandra-plugin.journal.table-autocreate
should only be used for local testing. But why is the default schema not good for production? With keyspace it is obvious since you need to create it manually with desired replication.
NetworkTopologyStrategy
@johanandren
it seems that the Cassandra connection happens only on the first read/write request because I see logs like
[com.datastax.oss.driver.internal.core.adminrequest.AdminRequestHandler] [] [s0-io-2] - [s0] Executing query 'SELECT * FROM system_schema.tables' {}
is there a way to make it not lazy? I want the cluster to fully load Cassandra metadata when it boots before handling requests