These are chat archives for RBMHTechnology/eventuate

19th
Dec 2016
Martin Krasser
@krasserm
Dec 19 2016 06:52

prepareRemove must be called atSource because it determines what you observe locally before you remove it downstream from an observed-remove set. So I don't think this is a valid approach.

Would it make sense to resolve this using Ordering instance for A (the payload)?

That's an option but rather restrictive for the payload type. I'd rather let the "last" addition win as explained in http://rbmhtechnology.github.io/eventuate/user-guide.html#automated-conflict-resolution for example.

Alexander Semenov
@Tvaroh
Dec 19 2016 08:21
Thanks, I will consider LWW option for the payload. Also, I may keep Ordering option too, since it's not required to provide its instance unless you choose that policy.
Alexander Semenov
@Tvaroh
Dec 19 2016 08:26
Regarding the prepareRemove thing. What I try to do is update existing payload if I detect that a node with same id (but different payload) is added concurrently. So I can't observe it at source because it's really not there (and it's an error if a node already exists on add). So the scenario is: I want to add a node and I check at source that it doesn't exist and its parent does exist. But then at downstream I find that duplicate node was added concurrently with different payload, so I need to update the payload (if LWW or Ordering policy says so). I.e. I need to replace an element in the underlying ORSet. Does it make sense?
  private[tree]
  def prepareCreateChildNode(parentId: Id, nodeId: Id, payload: A): Option[CreateChildNodeOpPrepared[Id, A]] =
    if (nodeExists(parentId))
      if (!nodeExists(nodeId)) Some(CreateChildNodeOpPrepared(parentId, nodeId, payload))
      else throw NodeAlreadyExistsException(nodeId)
    else throw ParentNodeNotExistsException(parentId)

  private[tree]
  def createChildNode(prepared: CreateChildNodeOpPrepared[Id, A], timestamp: VectorTime): TreeCRDT[A, Id] =
    if (nodeExists(prepared.parentId))
      edgesByNodeId.get(prepared.nodeId).fold {
        addEdge(Edge(prepared.nodeId, prepared.parentId, prepared.payload), timestamp)
      } { existingEdge => // concurrent addition conflict
        if (existingEdge.parentId == prepared.parentId) // both nodes under same parent
          if (existingEdge.payload == prepared.payload) this // nodes are equal
          else
            treeConfig.policies.sameParentMappingPolicy match {
              case MappingPolicy.Zero => ???
              case policy@MappingPolicy.ByOrdering() =>
                if (policy.firstWins(existingEdge.payload, prepared.payload)) this // existing node won
                else
                  replaceEdge(existingEdge, existingEdge.copy(payload = prepared.payload), timestamp) // update payload
            }
        else
          ???
      }
    else
      ???
Fabian Koehler
@fkoehler
Dec 19 2016 08:55

We have two nodes in production with the same C* DC where the eventsourced actors will be running. The deployment is rolling. We use docker in kubernetes.

From what I understood if both nodes want to communicate via the same log, each node has it's own local log with it's own ID which is persisted in C* and they are using a ReplicationEndpoint on each side to replicated the events.

Since the hostnames/IPs change on every deployment to me it seems like I need to configure the two replication endpoints via automatic lookups into a service discovery. Since there are use cases like: new node being added, old one removed it seems like I have to bake something around the ReplicationEndpoint which listens to changes in the nodes setup and shuts down the old and recreates a new ReplicationEndpoint on every change in the infrastructure.

Is this the way to go?

Martin Krasser
@krasserm
Dec 19 2016 08:56
@Tvaroh I understand your requirement but I'm not sure if your implementation of "replace" commutes the way you implement it (I don't have a counter-example at the moment though).
Alexander Semenov
@Tvaroh
Dec 19 2016 09:06
@krasserm thanks I see you point. I will leave it as is then for now, hopefully will have a better understanding of this later. Maybe some tests will discover a problem.
Martin Krasser
@krasserm
Dec 19 2016 09:08
@Tvaroh sounds good, a three-location chaos test as the one for ORSet is a good starting point
Alexander Semenov
@Tvaroh
Dec 19 2016 09:08
cool chaos is good, will look into it!
Alexander Semenov
@Tvaroh
Dec 19 2016 09:14
:+1:
Volker Stampa
@volkerstampa
Dec 19 2016 09:26

@fkoehler

From what I understood if both nodes want to communicate via the same log, each node has it's own local log with it's own ID which is persisted in C* and they are using a ReplicationEndpoint on each side to replicated the events.

This is correct.

Using a service discovery for setting up a ReplicationEndpoint indeed seems the way to go if your hostnames keep changing.
Currently Eventuate does not support dynamic addition and retirement of locations. So atm changing this requires shutting down the corresponding actor system and recreating the ReplicationEndpoint. We plan to support dynamic addition/retirement as described in RBMHTechnology/eventuate#314.

Alexander Semenov
@Tvaroh
Dec 19 2016 12:01

@krasserm not sure I fully understand the "last" addition win approach. Should I make edges: ORSet[Edge[A, Id]] (CRDT field) a var and wrap it into ConcurrentVersions? Like

ConcurrentVersions[ORSet[Edge[A, Id]], Edge[A, Id]

or something like that? Doesn't make much sense to me. Also I'm within CRDT not within an actor, if it means something.

Martin Krasser
@krasserm
Dec 19 2016 15:50
I didn't mean ConcurrentVersions usage in particular, rather the overall conflict resolution approach: if the vector timestamps are concurrent, use the system timestamps and, if needed, the emitter id for automated conflict resolution. A tree node will therefore need to store the vector timestamp, system timestamps and emitter id with child nodes so that the "last writer" can be determined in case of a concurrent update. A general framework (which can also be modified to be used in the CRDT directly) is http://rbmhtechnology.github.io/eventuate/user-guide.html#detecting-concurrent-updates
Alexander Semenov
@Tvaroh
Dec 19 2016 15:55
Got it now, thanks. How can I get the system timestamp and emitter id from within the CRDT btw?
Martin Krasser
@krasserm
Dec 19 2016 15:58
By implementing CrdtServiceOps.effect. DurableEvent contains all event metadata. See also https://krasserm.github.io/2016/10/19/operation-based-crdt-framework/
Alexander Semenov
@Tvaroh
Dec 19 2016 15:59
ah cool, I'm already using the effect method
thanks
Martin Krasser
@krasserm
Dec 19 2016 15:59
you're welcome!
Alexander Semenov
@Tvaroh
Dec 19 2016 16:32
looks like we can apply same LWW strategy to resolve add/add conflict when parents are different