These are chat archives for atomix/atomix

8th
Dec 2015
Jordan Halterman
@kuujo
Dec 08 2015 01:49
oooh
hmm
didn’t know that happened
Alright… one last 1.0 PR for Atomix and all the features are in
Jordan Halterman
@kuujo
Dec 08 2015 03:17
all done
Copycat and Atomix both in a feature freeze until the 1.0 release
time for testing, bug fixes, and documentation
Henri van den Bulk
@hvandenb
Dec 08 2015 04:14
I'm trying to force a hard leader election with the below code. However, I keep getting an error stating client not open.
        /**
         * Starts the Leader Election process.
         */
        private void election() {

            log.info("Starting the Leader Election process");

            try {
                // Create a leader election resource.
                election = server.get().create("election", DistributedLeaderElection.class).get(2, TimeUnit.SECONDS);

                // Register a callback to be called when this election instance
                // is elected the leader
                election.onElection(this).join();

            } catch (InterruptedException | ExecutionException e) {
                log.error("Election was interupted due to [{}]", e.getMessage());
            } catch (TimeoutException e) {
                log.error("Election creation timeed out [{}]}", e.getLocalizedMessage());
            }
        }
Henri van den Bulk
@hvandenb
Dec 08 2015 04:21
@hvandenb This is how I initialize Atomix
```java
Address address = new Address(host, settings.getPort());
        String logDir = settings.getLogLocation() +
        UUID.randomUUID().toString();
        log.info("Data Log location: [{}]", logDir);

        // Setup and initialize the Raft Server
        Builder builder = AtomixReplica.builder(address, members);

        builder.withTransport(new NettyTransport());
        builder.withElectionTimeout(Duration.ofSeconds(settings.getElectionTimeout()));
        builder.withHeartbeatInterval(Duration.ofSeconds(settings.getHeartBeat()));

         Storage s = new Storage(logDir);
         builder.withStorage(s);

        builder.withTransport(new NettyTransport());

        server = Optional.of(builder.build());
        server.get().open().thenRun(() -> {
              log.info("RAFT Server has started");
        });    
Henri van den Bulk
@hvandenb
Dec 08 2015 04:29
Also how do I know with Atomic if I'm the leader, it seems in CopyCat there is a getLeader method.
Jordan Halterman
@kuujo
Dec 08 2015 04:31
are you using the last release?
Henri van den Bulk
@hvandenb
Dec 08 2015 04:32
using 0.1.0-beta4
Jordan Halterman
@kuujo
Dec 08 2015 04:34
where does the “client not open” exception get thrown?
the first thing I think of is that the client code (server.get().create(…)) is being called before server.open() is finished, but that depends on how the code that’s not shown here is laid out
server.get().open().thenRun(() -> {
  election = server.get().create(“election”, DistributedLeaderElection.class).get();
});
Henri van den Bulk
@hvandenb
Dec 08 2015 04:37
I think that might have been it. I have the async thenRun and the right after kick off the election. I just changed it the server.get().open().get() to make it sync and then it did not throw the error
Jordan Halterman
@kuujo
Dec 08 2015 04:39
Yep. That catches a lot of people. Gotta either make it sync or put the call in the callback. The potential for mixing sync/async code with CompletableFuture is making for difficult documentation that I’m still trying to figure out
Henri van den Bulk
@hvandenb
Dec 08 2015 04:39
for sure and hard to debug as well
Jordan Halterman
@kuujo
Dec 08 2015 04:41
on the other question...
Henri van den Bulk
@hvandenb
Dec 08 2015 04:44
in my implementation I'm using the gossip protocol to discover cluster members, and continuously monitor the environment for these changes. Is there a way to add members to the cluster while the server is open? I understand the impact with RAFT and the replication side. However, I'd like to be able to add members when through gossiping I find additional members
Jordan Halterman
@kuujo
Dec 08 2015 04:44
The Raft specific things like leader and term aren’t exposed in the Atomix class because it largely tries to hide Raft implementation details and force things like the concept of a leader to happen through resources. Atomix is an abstract class that’s shared by both AtomixClient and AtomixReplica, so a “node” (client or replica) can be stateful or stateless, but resources provide an interface to making a stateless “node” behave like a stateful one for something like a DistributedLeaderElection
meaning a client can participate in a leader election, allowing for much greater scalability than the Raft algorithm itself will allow
Henri van den Bulk
@hvandenb
Dec 08 2015 04:47
@kuujo That makes sense, the question was part and parcel related to trying to getting the leader election working at the Atomix level. Now that I have that working, thanks to your pointer. I'll leverage that mechanism
Jordan Halterman
@kuujo
Dec 08 2015 04:47
gotcha
great
Henri van den Bulk
@hvandenb
Dec 08 2015 06:16
If I'm trying to create a DistributedValue why is it waiting (in sync mode). Is this caused by some min quorum it's needed?
r = server.get().<DistributedAtomicValue<List<Boolean>>>create(name, DistributedAtomicValue.class).get()
Jordan Halterman
@kuujo
Dec 08 2015 06:43
yep
there’s a replicated state machine that manages all resources
actually, it could be possible to create them lazily though
that might be an interesting idea
but when get or create is called, a CreateResource command is committed to the Raft log. An internal state machine manages all the individual resource state machines and logical sessions for each state machine
I actually need to document how it works internally
basically, all Atomix does is manages a bunch of Copycat state machines. So, when you call create, what the CreateCommand that’s committed to the Raft log does is creates a StateMachine for the specific resource type on each server. That state machine holds the state for the resource
This architecture actually makes it so individual resource StateMachines are completely independent of the Atomix ResourceManager. You can actually create a Copycat cluster with an Atomix MapState state machine. Atomix just multiplexes a bunch of state machines on a single Raft log and does the same for sessions. So, a single client opens only one session to the Atomix cluster, but Atomix creates a logical session for each resource to allow events from a resource’s replicated state machine to be sent to a specific object on the client
Henri van den Bulk
@hvandenb
Dec 08 2015 06:48
@kuujo nice that explains that
Still having issues with trying to create a Hard Election. It never seems to actually perform the election. below is the log and code am I missing something?
`Binding to Address[/138.67.128.74:5000]
Listening at /138.67.128.74:5000
Server started successfully!
Address[/138.67.128.74:5000] - Transitioning to LEADER
Address[/138.67.128.74:5000] - Found leader Address[/138.67.128.74:5000]
Connecting: /138.67.128.74:5000
Connecting: /138.67.128.74:5000
RAFT Server has started
Starting the Leader Election process
We've been elected as the leader
Lost leadership!
Starting the Leader Election process
        private void election() {

            log.info("Starting the Leader Election process");

            try {
                // Create a leader election resource.
                election = server.get().create("election", DistributedLeaderElection.class).get(2, TimeUnit.SECONDS);

                // Register a callback to be called when this election instance
                // is elected the leader
                election.onElection(this).join();

            } catch (InterruptedException | ExecutionException e) {
                log.error("Election was interupted due to [{}]", e.getMessage());
            } catch (TimeoutException e) {
                log.error("Election creation timed out [{}]}", e.getLocalizedMessage());
            }
        }
As a test I ran the example on github, but it has issues as well:
java -jar examples/leader-election/target/atomix-leader-election.jar logs/server3 localhost:5002 localhost:5000 localhost:5001 23:41:37.336 [copycat-server-localhost/127.0.0.1:5002] INFO i.a.c.server.state.ServerContext - Server started successfully! Exception in thread "main" java.util.concurrent.CompletionException: java.lang.IllegalStateException: failed to join the cluster at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
Jordan Halterman
@kuujo
Dec 08 2015 06:51
yeah I’m actually going to play with that right now
Henri van den Bulk
@hvandenb
Dec 08 2015 06:52
@kuujo great
Jordan Halterman
@kuujo
Dec 08 2015 06:54
yeah looks like I broke it… let’s see
Henri van den Bulk
@hvandenb
Dec 08 2015 06:55
ok let me create an issue for it as well
ok #77 has been created for this
Jordan Halterman
@kuujo
Dec 08 2015 07:15
nice :-) I love the links
ugh integrations aren’t working
Henri van den Bulk
@hvandenb
Dec 08 2015 07:16
which integration, it's nice to be able to see everything directly from #gitter
Jordan Halterman
@kuujo
Dec 08 2015 07:16
yeah
I’ve setup the GitHub integration several times and it never seems to show up
okay, I got the leader election example caught up with master, but running into a ClassCastException in the event message that it sends for elections that I can’t reproduce in tests so… have to track that down before the example will work correctly again
java.lang.ClassCastException: io.atomix.resource.InstanceEvent cannot be cast to java.lang.Long
    at io.atomix.coordination.DistributedLeaderElection$$Lambda$150/1413246829.accept(Unknown Source) ~[na:na]
    at io.atomix.catalyst.util.Listeners.accept(Listeners.java:70) ~[atomix-leader-election.jar:na]
    at io.atomix.copycat.client.session.ClientSession.handlePublish(ClientSession.java:815) ~[atomix-leader-election.jar:na]
    at io.atomix.copycat.client.session.ClientSession$$Lambda$120/688315958.handle(Unknown Source) ~[na:na]
    at io.atomix.catalyst.transport.LocalConnection.lambda$receive$5(LocalConnection.java:109) ~[atomix-leader-election.jar:na]
    at io.atomix.catalyst.transport.LocalConnection$$Lambda$121/1045876739.run(Unknown Source) ~[na:na]
    at io.atomix.catalyst.util.concurrent.Runnables.lambda$logFailure$12(Runnables.java:20) ~[atomix-leader-election.jar:na]
    at io.atomix.catalyst.util.concurrent.Runnables$$Lambda$4/110431793.run(Unknown Source) [atomix-leader-election.jar:na]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_11]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_11]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_11]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [na:1.8.0_11]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_11]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_11]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_11]
Henri van den Bulk
@hvandenb
Dec 08 2015 07:18
nice
Jordan Halterman
@kuujo
Dec 08 2015 07:23
ahh I know how to get it actually one sec
Jordan Halterman
@kuujo
Dec 08 2015 07:41
ahh there we go
GitHub integration seems to be working
So, the leader election example in master now works
Henri van den Bulk
@hvandenb
Dec 08 2015 07:42
cool I'll do a pull and check it out, thanks
Jordan Halterman
@kuujo
Dec 08 2015 07:42
there’s still a lot of work to do on the code that’s in both Copycat and Atomix master branches with the major changes that went into them - namely snapshots and resource recovery - so they’re still a little wonky
here’s what I did:
cd examples/leader-election
and then…
java -jar target/atomix-leader-election.jar localhost:5000 localhost:5001 localhost:5002
java -jar target/atomix-leader-election.jar localhost:5001 localhost:5000 localhost:5002
java -jar target/atomix-leader-election.jar localhost:5002 localhost:5000 localhost:5001
you should see one of the nodes get Elected leader!
kill that node and you should see another node get Elected leader!
you can enable DEBUG logging if you’re interested in all the crazy things Copycat is doing internally :-P but it has INFO logging by default
an example of some of the cleanup that needs to be done is when you kill a node, the leader will start logging failed configuration messages like crazy
that needs to be cut down
the bug I just fixed
those things
but within the next few days it should be back to where it should be and I’ll be pushing a release candidate as soon as some more significant testing goes into it after the cleanup
you might also see a Missing snapshot file bug
Henri van den Bulk
@hvandenb
Dec 08 2015 07:47
nice work, let me check it out
Jordan Halterman
@kuujo
Dec 08 2015 07:48

One relevant change on the Atomix side, overloaded get and create methods with a new ResourceType argument:

DistributedLeaderElection election = atomix.create(“election”, DistributedLeaderElection.TYPE).get();

Also, servers and replicas now support separate client and server Addresses. So, clients can connect to one Address and servers to each other via another set of Addresses. This will be used in the future to expose an HTTP API and increase concurrency.

Henri van den Bulk
@hvandenb
Dec 08 2015 07:49
Ah thinking being have a Client that exposes the REST API?
and it in turn working with a set of servers
Jordan Halterman
@kuujo
Dec 08 2015 07:52
Not totally sure how it will look yet… but it will be a little complicated because of the coordination involved in Raft. The initial implementation of REST may have fewer features. Copycat’s clients are event driven so things like locks and leader elections can be done with messages pushed from servers to clients when they happen. That’s more complicated over HTTP
ditto session tracking, which provides linearizable semantics in Copycat clients
Henri van den Bulk
@hvandenb
Dec 08 2015 07:55
do see some errors when running the example 0:54:14.463 [copycat-server-localhost/127.0.0.1:5000] ERROR i.a.c.u.c.SingleThreadContext - An uncaught exception occurred java.lang.IndexOutOfBoundsException: length cannot be greater than remaining bytes in the buffer at io.atomix.catalyst.buffer.AbstractBuffer.skip(AbstractBuffer.java:299) ~[atomix-leader-election.jar:na] at io.atomix.copycat.server.storage.snapshot.SnapshotReader
Henri van den Bulk
@hvandenb
Dec 08 2015 08:03
however one of the servers is elected leader.
Jordan Halterman
@kuujo
Dec 08 2015 08:08
yep… snapshot reading is buggy
it took us a ton of testing and debugging to finally get the log to work really reliably
snapshots aren’t as complicated but just needs more time and testing to work out the bugs before it can be released
massaging
now, once one node is elected, you should be able to kill that node and see another one get elected
assuming there’s still a quorum available
I started three nodes, waited for a leader to get elected, killed the leader, and watched for another leader to get elected
then you can restart the killed leader and kill the new leader and so on
leader election only works on failures right now, but we could conceivably allow a DistributedLeaderElection to step down voluntarily if there’s a use case for it
Jordan Halterman
@kuujo
Dec 08 2015 08:16
I guess there is
Henri van den Bulk
@hvandenb
Dec 08 2015 08:25
so leader election is ephemeral only when a leader is killed. I thought the goal with the RAFT protocol, which is not the same as DistributedLeaderElection is to have random leader elections happen. in other words a leader is only a leader for a certain amount of time
Jordan Halterman
@kuujo
Dec 08 2015 08:49
fixed that IndexOutOfBoundsException
oop I missed that last message sorry
well, I think the behavior of DistributedLeaderElection is certainly up for debate
Jordan Halterman
@kuujo
Dec 08 2015 08:57

The goal of DistributedLeaderElection is the same as the Raft leader election protocol, and that is to ensure that a leader - some leader - is available as frequently as possible. This is for coordinating clusters. All nodes that create a DistributedLeaderElection instance can call election.onElection(…) and Atomix will ensure that only one leader is elected at any given time, but that some leader exists as frequently as possible. So, if for instance I’m building a Kafka, each topic partition would create a separate DistributedLeaderElection instance:

DistributedLeaderElection partitionElection = atomix.create(“topic.partition.1”, DistributedLeaderElection.TYPE).get();
partitionElection.onElection(epoch -> {
  DistributedMessageBus messageBus = atomix.create(“topic.partition.1.messages”, DistributedMessageBus.TYPE).get();
  messageBus.open(new Address(“localhost”, 1234)).join();
  messageBus.consumer(“message”, this::doStuff);
});

In Kafka, each topic is split into some number of partitions, and each partition elects a leader. All writes to a partition go to the leader. So, this is a great example use case for leader elections. But in practice, if I were creating a Kafka I would likely want to include metadata about the leader that was elected. Maybe that would be a good thing to add to the DistributedLeaderElection resource. For instance, in this case it might be a server Address to which to send writes for the partition.

Jordan Halterman
@kuujo
Dec 08 2015 09:02

Also, in practice, in a Kafka-like system I might actually just use a single leader election to coordinate all the partitions to ensure they’re balanced across the cluster, and use a MembershipGroup to assign partitions to members.

DistributedMembershipGroup group = atomix.create(“group”, DistributedMembershipGroup.TYPE).get();
group.join().join();

DistributedLeaderElection election = atomix.create(“coordinator”, DistributedLeaderElection.TYPE).get();
election.onElection(epoch -> {
  for (Topic topic : topics) {
    for (int i = 0; i < topic.partitions().size(); i++) {
      group.members().get(group.members().size() % i).send(“Assign partition ” + i);
    }
  }
});

hax, but you get the idea

This actually makes me think there should be a messaging feature on DistributedMembershipGroup
group.onJoin(newMember -> {
  rebalancePartitions();
});
group.onLeave(oldMember -> {
  rebalancePartitions();
});
etc
What’s essential about DistributedLeaderElection here is that:
  • Rebalancing of partitions is coordinated by a single node
  • A node is available to rebalance partitions (a leader exists) as frequently as possible
Henri van den Bulk
@hvandenb
Dec 08 2015 09:31
nice I'm playing with the latest code. Thanks for taking the time in providing the detailed background on atomix.
Jordan Halterman
@kuujo
Dec 08 2015 09:31
no problem
any time
I’m always up for explaining things
helps me figure out how to go about documenting and helps find potential usability issues hopefully
Henri van den Bulk
@hvandenb
Dec 08 2015 09:34
What's the long term goal with atomix? there are many others out there. I do like the async approach to this solution.
Jordan Halterman
@kuujo
Dec 08 2015 09:39
I’d say the main reason behind it was my experience with both ZooKeeper and Hazelcast. ZooKeeper provides the coordination layer for a ton of major distributed systems projects, but that implies that they have to rely on an external dependency. So, I wanted to build something that could be embedded in projects as a component instead. The other significant issue with ZooKeeper is I believe its primitives are too low level, and the evidence for that is the need for Apache Curator. Hazelcast has done a great job providing high-level user-friendly APIs, so Atomix is certainly influenced by it. Short term goals are incredible stability. The two major medium term features are HTTP and sharding, which will be huge for accessibility and performance respectively.
Basically, it was largely influenced by my experience with ZooKeeper and Hazelcast and my work on Vert.x. So, the long term goals are: stability like ZooKeeper, usability like Hazelcast, and use cases like Vert.x (cluster management) and plenty of other use cases as well. My favorite use case example is always Kafka, perhaps because I know its internals.
I think a long, long term goal may be to expand it beyond the Raft algorithm to facilitate different types of replication, but that’s not even in my mind at this point
I’ve experimented some with that idea in the past and it’s complex
So, the compromise was to create as flexible a Raft implementation as possible, with support for eventually consistent reads and the primitives to build weaker replication algorithms on top of it
Jordan Halterman
@kuujo
Dec 08 2015 09:53
Vert.x was a big influence on async