These are chat archives for atomix/atomix

6th
Apr 2016
Jordan Halterman
@kuujo
Apr 06 2016 00:03
That's true when query 1 is retried, particularly when we include reads from followers. Queries on leaders can be sequenced, but queries on followers can't since whether a query is applied on a follower is local to that follower. The follower would have to do some sort of ack through the leader to allow other followers to progress, and that would make queries effectively as expensive as writes
If that makes sense. It makes sense on my drawing :-)
If we exclude queries from retries (or retries with sequential guarantees) then the first query attempt can be sequenced
Madan Jampani
@madjam
Apr 06 2016 00:12
So sequential guarantees can only be provided if reads happen from one place (leader)
If reads from followers are allowed, a client will not be gauranteed to see monotonically consistent reads.
For example: A (leader), B, C are the nodes. Client is initially connected to B (which is up to date) and then reconnects to C (which is way behind). It’s reads from C will go back in time.
Jordan Halterman
@kuujo
Apr 06 2016 00:26
Well... There is a way to ensure reads don't go back in time. Read responses return the read index, and the last read index is sent with requests. If a client switches servers and issues a read, the server will wait until its state machine reaches at least that index before evaluating the read.
ZooKeeper does the same thing
The problem I'm describing is really with concurrent reads from a single client. If a client issues read 1 and then read 2, and read 1 fails but read 2 succeeds, and then read 1 is retried read 1 will see state after read 2 even though it was issued before read 2
The same is not true for commands. In that case, command 1 will still be applied before command 2 since the leader will wait for the missing command 1 before committing command 2
The same could be done locally for queries, but they won't be ordered with other operations (commands and events)
except on the leader
Madan Jampani
@madjam
Apr 06 2016 00:33
Is there any way to provide that guarantee apart from writing queries to raft log?
disabling retries will help obviously
Jordan Halterman
@kuujo
Apr 06 2016 00:41
Not that’s I’ve come up with… not allowing queries to be retried is the only option I’ve found. When I try to find a solution that sequences queries with commands, with queries being evaluated on followers and commands on leaders, I find holes. I’ve explored some options where the client submits a sequence number for all operations - commands and queries. Perhaps the client submits a operationSequence number that sequences all operations, and a commandSequence number that the leader uses to sequence commands. When a command is received on the leader, it’s written to the log in sequential order to ensure commands occur in program order. When a follower receives a query, it waits for the prior operationSequence number to be received (query) or applied (command) before evaluating the query. But the problem is, the servers to which the client is not connected don’t have any way to know when it’s safe to apply commands from that client to their state machine. Only the follower to which the client is connected knows e.g. it can apply query 2 after command 1 and query 4 after command 3. But other followers don’t know that query 2 and 4 were successfully handled on another server. If they continue applying operations because the client is not connected to them and the client switches servers, the server may have already progressed beyond the client
if that makes sense
then we add events into that
what I’m reading in ZK seems to imply it doesn’t attempt to retry queries either
Jordan Halterman
@kuujo
Apr 06 2016 01:11
Also, adding events into these complex solutions for sequencing queries and commands introduces a lot more complexity. It seems to be easy to ensure commands/queries/events all occur in sequential order if we exclude query retries for reads from followers. Query retries on followers just mess everything up
Jordan Halterman
@kuujo
Apr 06 2016 01:21
I suppose if we exclude query retries, sequencing commands/queries/events becomes fairly easy. Command and query responses are sent with the last applied index to ensure state does not go back in time when switching servers. Queries are submitted with the last submitted command sequence number. Both of these things are done already, and an ordered protocol ensures queries will be sequenced with commands. A query will arrive on a follower before the next command is proxied to the leader. At this point, either the server to which the client is connected could handle sending responses and events in sequential order, or an event index can be sent with every response. If the client receives a response before the associated last event, it waits for the event.
Madan Jampani
@madjam
Apr 06 2016 02:02
I think you are right. Removing automatic query retries form the system does help reduce complexity. A client can still implement retries on top if they so desire. However that would be an explicit action on behalf of the client and from the client perspective the retried query is treated as a new operation and therefore appropriately sequenced.
Jordan Halterman
@kuujo
Apr 06 2016 02:10
Exactly
Jordan Halterman
@kuujo
Apr 06 2016 21:20
I did a lot of cleanup of operations last night. Commands and queries will now be properly sequenced on leaders and followers, including concurrent operations to e.g. write to a leader and read from a follower or vice versa. With proper ordering, I believe sequencing events it now a fairly trivial task. Operation responses will contain both the state machine index and last eventIndex published prior to the operation. These can be used in conjunction with events eventIndex to ensure operations occur on the client in order in which they occurred on the server. If a client receives an operation response with an eventIndex greater than the last received event, it waits for the event. If a client receives an event with an eventIndex greater than the last response index, it waits for the response. If no request from the client is pending, it completes the event (no sequencing needed). This results in a session event system that's much more efficient than the current implementation while improving sequential consistency to span all operations. It also results in a session event system that's still more fault tolerant than ZooKeeper's. From what I've found in my research, ZK does not guarantee a client will see an event when switching servers. That is, it essentially publishes and forgets events, but Copycat ensures that events can be reproduced by the commands that created them and events are effectively replicated so clients can receive lost events when switching servers. Once this is implemented I think we can explore my claims that queries cannot be retried separately.
brotherdetjr
@brotherdetjr
Apr 06 2016 21:25
Hi, I'm new to Copycat and Raft in general and haven't read all the docs yet. So probably my question is stupid or answered on some page at the web site. Is there a way to rollback uncommitted operations? And to submit multiple operations within the same commit? Thanks!
Another question is if Copycat can be used when nodes are in different datacenters and have avg latency about 200ms?
Jordan Halterman
@kuujo
Apr 06 2016 21:28
All Commands are atomic - they'll either be completely applied or not applied at all - within the context of a session. If a client's command succeeds but the client doesn't receive a response, Copycat's client will automatically resubmit it and ensure it's applied to the system exactly once at the (logical) time it was originally submitted by the client if that makes sense.
You can submit multiple concurrent operations (commands and queries) from a single client, and they're guaranteed to be applied in the order in which they were submitted by the client. Because this feature exists, there's no other way to batch multiple commands together
brotherdetjr
@brotherdetjr
Apr 06 2016 21:30
And what if one of the applied commands from the batch fails?
Jordan Halterman
@kuujo
Apr 06 2016 21:31
It would work fine across data centers, but TBH a change just needs to be made to NettyTransport to make the request timeout configurable. You'd just have to set a larger election timeout. The challenge is deciding where to put the nodes. If the majority of the nodes are in one data center and the connection is lost across DCs, only clients in the side with the majority will be able to commit writes
There is no notion of command failure... Only a notion of session failure. A command will eventually complete in the proper order some time between request and response if the session can be kept alive. If the session expired (because the client can't communicate with the cluster to keep its session alive) a command may or may not be applied, and there's no way to prevent a command from being applied if a session is expired because by definition the command happened prior to the session expiring if it's committed.
brotherdetjr
@brotherdetjr
Apr 06 2016 21:35
Thanks for the answers. One more question, if you don't mind. Are there any means to perform quite big priming? When starting the node, I need to load ~7GB.
*means or maybe caveats
Sorry. Didn't get it regarding the failures. Ok, we don't have a command failure. Why then to call commit.close() explicitly? Can't it be done in some wrapping method?
Jordan Halterman
@kuujo
Apr 06 2016 21:47
Commit.close() does not relate to the success or failure of a command in any way. By the time it's applied to the state machine, the command is already guaranteed to be stored on a majority of the cluster and is by definition successful. State machines can throw exceptions and what not to fail a command, but ultimately an exception is as valid an output as the return value of the command. What the release() or close() methods are for is so Copycat knows how to manage the log. close()ing a commit indicates to Copycat that it doesn't need to be retained in the log since it either no longer contributes to the state of the system or its state is/will be stored in a snapshot. Typically, state machines should just implement Snapshottable and release() commits immediately after they're applied. But Copycat supports incremental compaction as well where state machines hold on to Commits as long as they contribute to the state machine's state and therefore need to be retained in the log. For example, a map state machine can hold on to a Put command as long as that command is not superseded by another Put command on the same key. This is how Atomix maps are implemented: http://github.com/atomix/atomix/tree/master/collections/src/main/java/io/atomix/collections/state/MapState.java
Copycat also uses pools for the Commits, and when a Commit is closed it's released back to a commit pool (since Commits can be long living objects)
The gist is, Snapshottable state machines should almost always close() commits immediately since they'll be stored in the snapshot. But state machines that use incremental compaction may hold onto them and close them when it's safe for them to be removed from the Raft log
Snapshots are easier, but incremental cleaning may be more efficient
On the topic of size - it's of course important to remember that the state of the system must fit in memory. The log can certainly be 7GB
so long as all the commands aren't needed to represent the state of the system
Commands that are not needed by the state machine will be periodically compacted from the log
Err removed from the log during compaction
Jordan Halterman
@kuujo
Apr 06 2016 21:53
If you have a state machine that needs to store 7GB of data, what you want is a persistent state machine. In that case, operations would go through the Copycat log (could use StorageLevel.MEMORY for this) and would be written to disk in the state machine. Reads would return data from disk rather than memory. This is a perfectly valid use case. But there are some challenges in replicating snapshots to followers that Copycat may not be well suited for without some minor changes
In a persistent state machine, the state machine becomes the snapshot. So, when a new server joins the cluster, the state machines'a persistent state is what would be sent to a new server
brotherdetjr
@brotherdetjr
Apr 06 2016 21:57
Yep. Thanks. Isn't replication overhead too big? 7GB is quite an amount, and I'd like to minimize priming time as much as possible.
Jordan Halterman
@kuujo
Apr 06 2016 22:00
It is a lot… but Raft does replicate pretty efficiently (in batches). The bottleneck is that you’re pushing 7GB through a single leader rather than potentially writing to a partitioned system
brotherdetjr
@brotherdetjr
Apr 06 2016 22:03
ah, the last question for today :) what about partitioning/sharding? Should I create a number of independent clusters? Any smarter approaches? Or Raft and CP in general do not assume sharding possibility?
Jordan Halterman
@kuujo
Apr 06 2016 22:13
You can do sharding certainly. Copycat is somewhat designed to support it (through a custom Transport), but depending on the number of shards it may or may not be wise. The problem is, sessions are duplicated across shards. For each shard, the overhead of maintaining a client's session increases since there's no way to do it consistently across shards. Also, each shard has overhead for maintaining the cluster. If you have a few shards all that's fine. But if Copycat were designed from the ground up for shards, there could be optimizations like piggybacking append RPCs for multiple shards on a single request, sharing a session across shards with e.g. 2PC, etc