eventsByTag
probably.
@scullxbones Looking at CassandraProjection implementation and akka cqrs sample, it abstracts the offset store, how commit is performed, and some stream management.
For example we could have MongoProjection.atLeastOnce(...)
or MongoProjection.grouped(...)
for batches...
The offset store could store object such as:
{
_id: ObjectId
projectionName: String // eg. PersonProjection if we handle events about a Person entity
projectionKey: String // eg. personTag-1 for Person events tagged
offset: ObjectIdOffset. // from your lib
}
`
Idea is that there is some boilerplate code around event processing and with this AkkaProjection we can focus on event processing itself.
Of course, we could have our own implementation, but I think it is quite common code.
Hello @scullxbones
I read the discussion in the issue scullxbones/akka-persistence-mongo#37.
Regarding ordering of events there are comments about having a timestamp
field to have precision lower than 1s but AFAIK, current eventsByTag
queries still uses _id
field.
Is there any ongoing dev to have a precision lower than 1s or am I missing something ?
Thanks for the reply @scullxbones,
I am aware of the clock problem, but AFAIK the problem is already present in current versions due to timestamp part of the ObjectID.
You even mention it in the docs: For offsets to operate correctly in a distributed environment, the system clocks of all journal-writing processes should be synchronized
Regarding the global counter, I agree it would solve the problem, but would also lead to performance loss which is why I think this is blocked for now.
The main problem I can see for now is that I can deal with clocks while deploying my app, so mentioning it in the docs should get the user attention.
But I have no way to solve the ordering sequence problem so I am kind of stuck with it.
That being said, I can sometimes predict that ordering of events is broken and try to fallback on something but ...
If I can give my opinion, I think that having this timestamp < 1s in place would not create any new problems and would solve the issues for the majority of the people using eventsByTag
.
(Any way if you have distributed environment, with persisted data with not sync'ed clocks, you will probably have problems at some point...)
Of course the clock problem remains, but if I (as a user) take care of it, then problem solved
I am seeing it this way:
Now:
After:
@gael-ft I agree on the existing problem with clock skew. The intent was for that to be a solution for some part of use cases: < 1/s event rate, or ordering insensitive/eventually consistent (like CRDT) come to mind. I was trying to eliminate the skew with the global counter.
That said, I see the following viable (not exclusive) options to fix sorts for EventsByTag
:
EventsByTag
use only. Less contention due to mongo document level locking (1 document = 1 sequence), again making use optionalWDYT?
@scullxbones
From my perspective:
_id
, only improvments.allEvents
would have ordering problem, whereas the second point has not. So to resume I would have three strategies, driven from a conf for example:
_id
(for backward comp etc... but could be marked as deprecated in flavor of the timestamp one)Hello @scullxbones
Some of our apps are based on a "multi-tenant" architecture, so the software is shared by clients but each one have its own database, collections ...
We'll update a bit the implementation of some persisted actor, and why not using your plugin.
But here is the problem:
As they have their own database, the configuration key akka.contrib.persistence.mongodb.mongo.database
is too strict for us.
I quickly thought about it and here is what I would like to suggest:
Having a akka.contrib.persistence.mongodb.mongo.database-resolver
configuration key where the value is the FQCN. This class would be responsible to determine the database name when handling persistence based on the PersistenceId for example (could be other like Tag ?)
So it would look like this
// This trait would be in your library
trait DatabaseResolver {
def databaseNameFrom(persistenceId: String): String
}
// This class would be on application side
package a.b.c
class MyDatabaseResolver extends DatabaseResolver {
def databaseNameFrom(persistenceId: String): String = persistenceId.split('_')
}
// In configuration.conf
akka.contrib.persistence.mongodb.mongo.database-resolver = 'a.b.c.MyDatabaseResolver'
Do you think this feature would be acceptable in the lib ?
suffixed collection names
Hi @scullxbones,
Happy to see that you are open to add this feature to the lib!
Hmm being an interface is not mandatory, but I think this is the most generic way of doing it, to let application side leading the logic.
That being said, it might be useful to have some provided logics (like regex as you said).
I'll describe what's happening for most of our apps to let you decide if it could be a builtin logic.
Considering
PersonActor
The persistence identifier will have the following format PersonActor|potato_<objectId>
I am not sure but in some cases it could be PersonActor|potato|<objectId>
Any way it could be handled by regex.
But the database name is prefixed by our company name so database name becomes mycompany-potato
, mycompany-tomato
...
So in our specific case, having an extra configuration key to add a prefix would be perfect
To conclude the configuration could look like (I omitted akka.contrib.persistence.mongodb.mongo
)
// if I want to do it manually
database-resolver = "my.clazz"
// if I want to use the builtin logic described above
database-regex-from-persistence-id = "(regexForEntityTypeHint)|(regexForClient)_(regexForEntityObjectId)" // eg. "(\w+)|(\w+)_([a-z0-9])"
database-regex-prefix = "mycompany"
akka specified
queries as well as the all events
query?
That's ok for me.
Regarding read / query side, my opinion on that is that if you configured your write side as multi-tenant, then the read side should behave the same.
In fact if the application wants to merge differents streams from different tenant, it can be quite easily done with akka streams.
Note that the opposite could be done as well (ie. all events then app filters the events) but, for me, it looks way more dangerous because tenant are mixed by default ...,
and from performance point of view, I think the first one is better as well.
But I have to admit that I don't really know how to handle the tenant from the interfaces provided by Akka.
Those interfaces are bounded to a plugin identifier, and then provides methos which does not have any tenancy stuff.
I will have to deep into the code to see what are the solutions. do you have idea ?
ScalaDslMongoReadJournal
to set the database, if it was not given the configuration (ie. database-resolver was given)
Hi @nleso -
It is not documented in the latest documentation, but there is a legacy approach to supply connection information to the plugin that can be seen in the older documentation. The implementation is in MongoSettings.MongoUri:
val MongoUri: String = Try(config.getString("mongouri")).toOption match {
case Some(uri) => uri
case None => // Use legacy approach
val Urls = config.getStringList("urls").asScala.toList.mkString(",")
val Username = Try(config.getString("username")).toOption
val Password = Try(config.getString("password")).toOption
val DbName = config.getString("db")
(for {
user <- Username
password <- Password
} yield {
s"mongodb://$user:$password@$Urls/$DbName"
}) getOrElse s"mongodb://$Urls/$DbName"
}
You can see if the mongouri
configuration is not supplied, it falls back to using the legacy fields of urls
, username
, password
, db
... which now that I've typed all this out I see just generates a URI. Hmm