Global-scale event sourcing and event collaboration with causal consistency
fkranawetter on master
Added maintenance mode notice t… (compare)
krasserm on gh-pages
updated site (compare)
volkerstampa on master
Add info about maintenance mode… (compare)
ReplicationProtocol
unchanged and send scheduled ReplicaVersionVectors
from EventLog
to remote Replicator
(with a default of 1 minute perhaps).
emitterId
of a DurableEvent
indicates the log that emitted the event initially (i.e. in the example above A, B or C). When processing events you could maintain a version vector for each emitter id by merging the vector timestamps of all events from the same emitter. As far as I understand this is what you are trying to get, right?
A = (A ->1)
B = (A ->1, C ->1)
C = (C -> 1)
A = (A =1, C = 1)
B = (A =1, C = 1)
C = (A =1, C = 1)
the RTM built at each location would lag a little bit behind
This is true, but the word little is a bit trickier. In this example it will lag until C writes a new event (C -> 2, A -> 1, B -> 1).
So in the worst scenario, if C is an endpoint that doesn't persist many events, the stabilization will lag dramatically.
I also wonder if this all is an application level concern
IMHO I think it is, stabilization is proper of the vector clocks used by the EventLog and it should be in the [core] module, CRDT's just use it for optimization.
Also, I implemented it there because I started using ReplicationRead
& ReplicationWrite
and replicaVersionVectors
after, so I was forced. However, if we agree on a solution that doesn't need any of these and you think it should go in the [crdt] module, I could do it.
could it might make sense to integrate this in the CRDT logic instead of the log?
You mean like having an EventsourcedActor that receives all events and builds the RTM using the emittedId
?
This is affected by the lag issue discussed above.
To get a better understanding of the optimization I browsed quickly over the paper "Making Operation-based CRDTs Operation-based". As far as I understand the optimizations regarding causal stability would affect implementations in eventuate like the versionedEntries
in ORSet
and could lead to more efficient storage, is that correct? I actually wonder if it make sense to start with an implementation that does not introduce new messages between the locations even if that comes with a potentially significant lag. Actually to me this also seems to be the approach proposed in the paper (in section 4 after the definition of "Causal Stability").
IMHO I think it is, stabilization is proper of the vector clocks used by the EventLog and it should be in the [core] module, CRDT's just use it for optimization.
I agree that causal stability is a property of the distributed log. However I wonder if some sort of application specific configuration is anyways required as in Eventuate the logs do not have a priori knowledge about the existing locations which seems to be required to determine causal stability. I guess atm Eventuate could just determine causal stability for all locations it has already seen. What are your thoughts on this?
implementations in eventuate like the versionedEntries in ORSet and could lead to more efficient storage, is that correct?
I've implemented the AWSet (aka ORSet) as a pure op-based CRDT. This kind of crdts use stable VectorTime (aka TCStable) to prune the POLog (basically the set of operations), so yes is for more efficient in-memory storage (with snapshots this should lead to more efficient persistent storage).
I actually wonder if it make sense to start with an implementation that does not introduce new messages between the locations even if that comes with a potentially significant lag
I agree, documenting this lag behavior. So I should go back to use ReplicationRead
and ReplicationWrites
to update the RTM? Or you prefer a less intrusive implementation?
However I wonder if some sort of application specific configuration is anyways required as in Eventuate the logs do not have a priori knowledge about the existing locations which seems to be required to determine causal stability
This is correct. The stabilization requires knowing all the partitions of an EventLog. I've already implemented and tested a solution for this that requires a couple of messages from Connector
while this is creating the corresponding Replicator
s and after this initial messages it uses replicated events to solve the set of partitions.
My current implementation uses the conf eventuate.endpoint.connections
(this conf is always required, isn't?)
The only "issue" with my current implementation is regarding changes in the cluster, e.g. added/removed connection endpoint, but is safe to do this in Eventuate? In that case I guess I could check for the ApplicationId
and ask the user to change when it adds/removes an endpoint (to restrictive?)
one could build a network A -> B where only events emitted at A are replicated to B, but events emitted at B are not replicated to A
I haven't tried replication filters so I want to ask, in this case events generated at A are not causaly related to events generated in B? In other words, events generated at A will always have VectorTime(A -> x)
and evens generated in B VectorTime(A -> x, B -> y)
, or events are not replicated from B to A but it still updates the VectorTime on ReplicationReadSuccess from B?
make the CRDTs maintain the RTM they need internally and let them decide about their causal stability. They could be configured in form of a priori knowledge about the locations they should consider
We could read a property eventuate.partitions.crdtServiceId
from application.conf when a new CRDTService is instantiated, this property should contain the list of partitions to consider for the specified crdtService, e.g. eventuate.partitions.awset-service1 = ["A_log1","B_log1"]
Then, we could mantain a RTM per CRDTService using an EventsourcedView that extracts the pair (emitterId,vectorTimestamp)
from all the DurableEvent of the EventLog.
You are thinking in something like that?
Note that the eventuate.partitions.crdtServiceId
configuration is error prone, if you add an extra location from which the endpoint will never receive updates, stability will never be reached but it has not impact on correctness (i.e. the values of the crdts will remain correct). In the other hand, if you forget to add a location that is part of the EventLog and is not filtered out (like in the case discussed above), then stability could fire a TCStable(VectorTime(A-> x)) when B has not yet seen VectorTime(A-> x) and this could break CRDTs.
We could read a property eventuate.partitions.crdtServiceId from application.conf when a new CRDTService is instantiated, this property should contain the list of partitions to consider for the specified crdtService, e.g. eventuate.partitions.awset-service1 = ["A_log1","B_log1"]
Seems to be reasonable to me.
Then, we could mantain a RTM per CRDTService using an EventsourcedView that extracts the pair (emitterId,vectorTimestamp) from all the DurableEvent of the EventLog.
Not sure if this is feasible?!? Replay times of this view might become infinite (at least without using snapshots). I wonder if it is possible to integrate this into the aggregates?
Note that the eventuate.partitions.crdtServiceId configuration is error prone
Indeed that is correct. But Eventuate configuration is already for experts only. You could for example build replication network with cycles over filtered connections which could lead to persistent event loss (see the warning here). Likewise incautious uses of processors could cause similar problems (as they behave similar in that respect to filtered connections).
... then stability could fire a TCStable(VectorTime(A-> x)) when B has not yet seen VectorTime(A-> x) and this could break CRDTs.
At least that is only transient, isn't it? I.e. it could be fixed by fixing the configuration and restarting the service as the persistent log is not really affected, is it?
Not sure if this is feasible?!? Replay times of this view might become infinite (at least without using snapshots). I wonder if it is possible to integrate this into the aggregates?
Do you mean create a RTM per CRDT where the EventsourcedView will have the aggregateId = crdt.aggregateId?
Anyway, snapshots should prevent heavy replays, maybe scheduled snapshots?
At least that is only transient, isn't it? I.e. it could be fixed by fixing the configuration and restarting the service as the persistent log is not really affected, is it?
Is not transient if you take a snapshot of the CRDT. The snapshot will persist a wrongly pruned POLog.
VectorTime(A -> 1)
TCSBActor.onEvent
TCSBActor.onEvent(VectorTime(A -> 1)
emits (as a command) TCStable(VectorTime(A -> 1))
CRDTActor
receives TCStable
command and perform stability logicCRDTActor
receives event with VectorTime(A -> 1)
Do you mean create a RTM per CRDT where the EventsourcedView will have the aggregateId = crdt.aggregateId?
Anyway, snapshots should prevent heavy replays, maybe scheduled snapshots?
I actually thought about integrating this into the CRDTActor
. Could that make sense?
However the problem I now see with that is that being a command we can not assure that the CRDT Actor has actually seen the events (aka operations) before processing the command, while being an event we were saying that the TCStable is in the causal future of the events that are stable at that TCStable.
If the logic is integrated into CRDTActor
it could initiate stability logic after the effect is applied, right? So reordering should not occur and stability logic is also applied on replay which seems to be correct as well. WDYT?
I actually thought about integrating this into the CRDTActor
Ok, I will be implementing this option in the next few days and I'll back when I'm done. However I must point out that all the CRDTs in a local EventLog will be doing the same work to reach the same TCStable
, although it is very cheap.
The overhead will be a Map[String,VectorTime]
per CRDTActor so it depends on:
So, the answer will be "it depends", for example.
In my opinion, solve stability per service makes more sense, but this requires persist the TCStable for the reasons discussed before.
TCStable
is not that easy as this event has to be consumed by all CRDTActor
s, right? But a CRDTActor
is an aggregate so a central emitter would have to route the event explicitly to each and every aggregate which does not seem to be feasible. So I guess an event would have to be emitted by the CRDTActor
itself. OTOH this would end up in one TCStable
event per CRDT which also seems to introduce quite some overhead.
a CRDTActor is an aggregate so a central emitter would have to route the event explicitly to each and every aggregate which does not seem to be feasible
True, I had forgotten this. The CRDTManager should have to be an EventsourcedActor that creates CRDTActor on events, and use this events to maintain a set of their aggregate ids to use it as customAggregateId when the TCStable is persisted. It's seems pretty complex, if it feasible at all, and I don't know the cost involved in use large set of custom aggregate ids for delivery.
If you agree, I will implement a first version solving stability per CRDTActor (without persist TCStable) and share it with you when I'm done.
I've finished, should I create a PR?
Sure ... sounds great. However TBH I am not sure how much time I can spend on reviewing this atm so please be patient.
Regarding the Individual Contributor License Agreement, I've sent an email on 12/2 but I didn't receive an answer.
Sorry, but I do not have insight into this. Maybe @krasserm knows more about this?
ReplayFailure
is send to actor only once (instead of 10 times)
reference.conf
under test/resources
and this defines replay-retry-max
as 0
. Does that answer your question?
Newbie question: are there some extra steps necessary when trying to integrate eventuate with IntellijIdea? I'm trying to use it to debug tests, but all I'm getting is:
Unable to load a Suite class. This could be due to an error in your runpath. Missing class: com.rbmhtechnology.eventuate.EventsourcedProcessorSpec
java.lang.ClassNotFoundException: com.rbmhtechnology.eventuate.EventsourcedProcessorSpec
EventsourcedProcessorSpec
.