Asynchronously writes journal and snapshot entries to configured JDBC databases so that Akka Actors can recover state
Hi everyone,
I'm trying delete all events into my journal based in some conditions in tag column. I managed to do like that, but I don't like this import in the middle of the code and would like to remove it.
Anyone knows how to do it without this import in the middle?
`
def deleteJournal(companyId: CompanyId): EitherF[Done] = {
val query: PostgresReadJournal = PersistenceQuery(actorSystem).readJournalFor[PostgresReadJournal](PostgresReadJournal.Identifier)
import query.driver.api._
val deleteQuery = query.journals.filter(t => t.tags @> Map("companyId" -> companyId.id.toString)).delete
EitherT.liftF(
query.database
.run(deleteQuery)
.map(_ => Done))
}
`
SaveSnapshotSuccess
you could choose to atomatically delete the old snapshots
Hello guys!
I have a pretty easy question.
What is the meaning of jdbc-read-journal.refresh-interval
and jdbc-read-journal.journal-sequence-retrieval.query-delay
configuration settings?
For example, if I want to have a polling interval = 500ms, I set them both to 500ms. Is it correct or I misunderstood something?
What is the recommended value?
I have 3 nodes of my application with 30 shards for each entity (I'm using Lagom). The default value for both settings is '1s'.
But during my experiments I found that for good performance I have to set it to <100ms. But in this case my DB experiences huge load by polling requests.
The "journal-sequence-retrieval" configuration is a mechanism to ensure that the events are returned in the correct order (it only applies to eventsByTag queries). It basically checks which ids exist in the database, and it detects "missing" ids (which can happen in case a transaction is not committed yet, but also if a transaction fails). The "query-delay" configured how often it checks for new ids. In cases of missing ids, the "max-tries" setting determines the number of tries untill the assumption is made that the id has been skipped over. (note: setting this value too low might cause events to be missing in an eventsByTag query).
The refresh-interval
setting determine how often polling for new events is done. (for all queries)
It makes no sense to have jdbc-read-journal.journal-sequence-retrieval.query-delay
bigger than jdbc-read-journal.refresh-interval
. Setting these to equal values seems to be okay to me.
You can find the defaults here https://github.com/akka/akka-persistence-jdbc/blob/master/core/src/main/resources/reference.conf
These are reccommended unless you need somethign different
refresh-interval
determines how long the db will wait in case the last attempt did not have any more events to retrieve
refresh-interval
is set to a high value. (e.g. 10 seconds) then for a long-running steam there will be no significant slowdown. The only thing is that some events might arrive a bit later
Hello all, I am testing Amazon Aurora postgreSQL with Akka persistence JDBC. Anybody has some feedback on the use of it? Searching on the chat I saw that some years ago @calvinlfer asked about it, thought could not find more details.
I am interested on Akka persistence jdbc configuration tuning specific to the usage of Aurora.
From my observation the queries to recover the journal are taking much longer that a local instance of Postgresql (docker) 29 s vs 1s.select "ordering", "deleted", "persistence_id", "sequence_number", "message", "tags" from "system"."journal" where ((("persistence_id" = 'processor') and ("deleted" = false)) and ("sequence_number" >= 1)) and ("sequence_number" <= 11854) order by "sequence_number" limit 9223372036854775807;
Thanks!
Hey guys
I've just plugged in jdbc to my poc application and persistence stopped working. The only logs it's giving me are
[2020-08-02 20:50:59,258] [ERROR] [com.poc.License] [] [License-akka.actor.default-dispatcher-18] - Supervisor RestartSupervisor saw failure: Exception during recovery from snapshot. PersistenceId [License|arturs_license]. Circuit Breaker is open; calls are failing fast
akka.persistence.typed.internal.JournalFailureException: Exception during recovery from snapshot. PersistenceId [License|arturs_license]. Circuit Breaker is open; calls are failing fast
Is there any way to make it give me more information? log level is debug already
Was working fine with cassandra before
hey guys, I am getting errors while saving the snapshot:
(persistence_id, sequence_number)=(MEMBERSHIP-d8ec1971-f095-43d2-a69f-39772463fd90, 43) already exists.
The issue is intermittent. But sometimes the latest snapshot gets deleted, and all the events are replayed for that actor for any new sequence number.
More Info at:
https://discuss.lightbend.com/t/error-while-saving-snapshot-persistence-id-sequence-number-already-exists/7389
I am using postgres as backend as persistence store. Any leads will be helpful.
I had tried to run slick in debug mode, it is failing in upsert for the same sequence id for the third time, for the first two times, the opersation succeeds. For the third time it fails with the error I quoted in the message above.
Sql is same in all the upserts:
Executing prepared update: HikariProxyPreparedStatement@160991021 wrapping update "event"."snapshot" set "created"=?,"snapshot"=? where "persistence_id"=? and "sequence_number"=?; insert into "event"."snapshot" ("persistence_id","sequence_number","created","snapshot") select ?,?,?,? where not exists (select 1 from "event"."snapshot" where "persistence_id"=? and "sequence_number"=?)
that is first it tries to update and then it tries to insert. Don't know why it tries to insert the third time because exists check is already there on insert.
Hi guys! I'm using akka-persistence-jdbc plugin with postgres and durable_state, and if I create durable_state table at another schema(not a public, but work for example) and set config jdbc-durable-state-store.tables.durable_state.schemaName to work, program always thrown error at logs:
Caused by: org.postgresql.util.PSQLException: ERROR: relation "durable_state" does not exist. But if I create durable_state table at public schema, everything is ok.
Can you please tell me if this problem can be bypassed?