These are chat archives for atomix/atomix

18th
Jul 2017
@kerbymart
@kerbymart
Jul 18 2017 02:18
@kuujo thanks for the explanation, I have another question what does Polling members means for the Copycat Server where clients are unable to connect to it?
@kerbymart
@kerbymart
Jul 18 2017 03:11
I managed to fixed this by clearing the logs and starting again the copycat-server
@kerbymart
@kerbymart
Jul 18 2017 11:59
Hello @kuujo I did another test with Copycat-based KVS then I tried to save a big file and delete it, but the log file did not get smaller even the value of the key is already null.
Johno Crawford
@johnou
Jul 18 2017 14:22
@kerbymart maybe you could put together a reproducer
@kerbymart
@kerbymart
Jul 18 2017 14:23
@johnou sure I am going to push the codes to Github as open source Key-value store on top of Atomix
Johno Crawford
@johnou
Jul 18 2017 14:24
@kerbymart iirc Atomix already has key value store impls? https://github.com/atomix/atomix/tree/master/core/src/main/java/io/atomix/primitives/map
@kerbymart
@kerbymart
Jul 18 2017 14:31
@johnou interesting what I mean by KV is something that runs on a servlet container with some REST API access to it or access via a KV client
...more of like a Datastore that is usable for apps at high level for storing users, profiles, and other app data
uhm, are there any database implementations yet that uses Atomix at the moment @johnou ?
Johno Crawford
@johnou
Jul 18 2017 14:33
not to my knowledge
buko
@buko
Jul 18 2017 14:47
Atomix 2.0 isn't out yet right?
Johno Crawford
@johnou
Jul 18 2017 14:47
alpha
buko
@buko
Jul 18 2017 14:48
And there's not much point building anything on 1.0 right? I presume the apis are pretty different?
Jordan Halterman
@kuujo
Jul 18 2017 14:59

@kerbymart I've had various talks with database developers about using Copycat/Atomix in databases, but haven't followed up on them so I have no idea where it's being used. We use it in ONOS (http://onosproject.org) to store the vast majority of the cluster's state. ONOS is not itself a database, it's a network operating system (SDN) that runs partitioned Copycat clusters internally.

Atomix 2.0 is growing out of ONOS's distributed core. ONOS built a lot on top of Atomix that's now being moved back into Atomix itself (partitioning, messaging, failure detection, eventual consistency, etc).

As for the logs, when a key is deleted from a map it won't immediately be removed from disk. That model is not well suited for a commit log. It's incredibly inefficient to remove a commit from the middle of a log on a delete. Instead, a background process will eventually compact the logs and remove unneeded entries. In Copycat, that happens when logs roll over to new segments. In Atomix, when logs roll over to new segments, a snapshot of each state machine is taken and written to disk and then old segments are deleted.

The Polling members message is a node trying to get elected. Before a node starts a new election it will poll the other nodes to ask if they would vote for it in an election. This is referred to as the pre-vote protocol in the Raft community. If a node is repeatedly polling then that seems to indicate that it can't win an election and the node that can win is not running.

Johno Crawford
@johnou
Jul 18 2017 15:01
@kuujo will there be a video feed for your talk?
Jordan Halterman
@kuujo
Jul 18 2017 15:01
@buko The Atomix 2.0 Raft implementation is done and will be cleaned up and released in the next few weeks, then we'll move over the primitives and state machines
I think it should be posted online
Afterwards
Jordan Halterman
@kuujo
Jul 18 2017 15:26
It will review the architectural flaws in Copycat that led to refactoring and discuss how those flaws were addressed in Atomix.
@kerbymart
@kerbymart
Jul 18 2017 15:37
@johnou here's the Atomix-based KV project: https://github.com/dotweblabs/astro
@kuujo thanks for the explanation, in the Astro project we experience this Polling members if you have time to check the use-case of this, perhaps you can guide us what is the best way to utilize Atomix/Copycat
Jordan Halterman
@kuujo
Jul 18 2017 15:42
typically it means there’s no leader that’s sending AppendRequests to the node. When a follower doesn’t receive an AppendRequest for more than an election timeout, it attempts to start a new election by polling members. If it’s repeatedly polling nodes, that indicates that the node that’s polling can’t get elected, but it’s still not receiving AppendRequest from a leader. If there’s a leader and another node is repeatedly polling then that indicates a communication problem.
this is our really crud implementation, it could be because we are running Copycat on a servlet
So when Polling members occurs that means the copycat-server cant get elected, is there a way to prevent this or have a time to force the election, not sure if I totally understand it, but I checked port 5000 in my machine an nothing is running so why would the server would not run?
Jordan Halterman
@kuujo
Jul 18 2017 15:50
It has to be able to communicate with a majority of the cluster first and foremost to get elected. So if you start one node in a 3 node cluster, it will poll repeatedly since it can’t get elected. The other reason it would poll repeatedly is that it can’t win an election, meaning its log is not up-to-date enough to win. If we allowed elections to be forced then we’d lose all consistency guarantees. Leader election is integral to the safety of the cluster. Only a node with all committed entries can be elected.
@kerbymart
@kerbymart
Jul 18 2017 15:55
Okay so it means the problem with our code is that the current node is looking for the other nodes? I wonder why is this since we only have set one address String[] address = {DEFAULT_ADDR + ":" + DEFAULT_PORT};
Jordan Halterman
@kuujo
Jul 18 2017 15:57
that is odd indeed… is it reading an old configuration from disk? What members is it polling? It should show the address that it’s polling
@kerbymart
@kerbymart
Jul 18 2017 16:26
Yeah it looks like
Fixed when I deleted the data/logs
now getting this error
java.util.concurrent.ExecutionException: io.atomix.copycat.error.InternalException: internal Raft error
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
    at io.atomix.catalyst.concurrent.BlockingFuture.get(BlockingFuture.java:40)
    at com.dotweblabs.astro.AstroClient.put(AstroClient.java:70)
@kerbymart
@kerbymart
Jul 18 2017 16:37
I figured this out its a wrong Command import due to refactoring, with this, does it mean that when we build systems with Atomix/Copycat refactoring it not an option anymore since if the data logs have the data stored then run the server again it expects the same Object signature, is my analysis correct? @kuujo ?
Jordan Halterman
@kuujo
Jul 18 2017 16:45

It doesn't mean that at all. A major driver of refactoring in Atomix 2.0 was supporting in service upgrades. We needed to be able to be able upgrade a running cluster without losing availability. So Atomix 2.0 uses Kryo serialization wig support for backwards/forwards compatible reads, so old commands stored on disk can be read as new objects and new commands can be sent to old servers. It just places limitations on refactoring that mean it has to be done carefully and tested.

The other option is to do offline upgrades, which would mean writing a script to rewrite logs during an upgrade.

This is a challenge any persistent system has, and it's a vast research area. Those are the two basic approaches: online or offline, and synchronous or asynchronous.

If you just need to be able to refactor a state machine, the answer is using a serializer that supports schema evolution, which means Kryo's CompatibleFieldSerializer or Protobuf or Thrift or...

Or writing serializers in a way that can handle backwards compatibility (the fastest option). That can be done by writing fields in a specific order and writing a length prefix and then skipping remaining bytes for forwards compatibility or setting defaults for backwards compatibility.
Jordan Halterman
@kuujo
Jul 18 2017 17:19
with*
@kerbymart
@kerbymart
Jul 18 2017 17:32
okay cool, so it seems there is a work-around so this would not be an issue in the long run
@kerbymart
@kerbymart
Jul 18 2017 18:28
Hi @kuujo is there a good way to do search with Copycat?
Jordan Halterman
@kuujo
Jul 18 2017 18:28
what do you mean?
Search of what?
@kerbymart
@kerbymart
Jul 18 2017 18:33
values
Jordan Halterman
@kuujo
Jul 18 2017 18:35
Copycat doesn’t care what state machines do. It’s just up to the state machine to implement the search of values. No reason it can’t be done
Jordan Halterman
@kuujo
Jul 18 2017 18:41
Just a matter of finding a good Java library to do it I suppose
@kerbymart
@kerbymart
Jul 18 2017 18:43
What could be the general strategy? Iterate over all the logs?
buko
@buko
Jul 18 2017 19:12
@kerbymart In the past what I've done is a combination of lucene and in-memory/cached indexes that can yes be recreated by replaying the logs
@kerbymart
@kerbymart
Jul 18 2017 19:14
@buko I was actually thinking of Lucene also
@kuujo there's a problem with Copycat logging
I started Jetty servers with Copycat servers then when I stopped then try to run again server would not run
Had to delete all the logs to make it work again, this is not good for real-world app when logs contains actual data
@WebListener
public class KeyValueStore implements ServletContextListener {

    public static Logger logger = Logger.getLogger(KeyValueStore.class.getName());

    private static final String DEFAULT_ADDR = "127.0.0.1";
    private static final String DEFAULT_PORT = "5000";
    private static final String DEFAULT_PATH = "\\data";

    private ExecutorService executor;
    private Astro server;

    @Override
    public void contextInitialized(ServletContextEvent sce) {
        executor = Executors.newSingleThreadExecutor();
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                logger.info("Running Astro");
                String port = DEFAULT_PORT;
                if(isPortUsed("127.0.0.1", Integer.valueOf(port).intValue())) {
                    int p = FreePortFinder.findFreeLocalPort();
                    port = String.valueOf(p);
                }
                String[] address = {DEFAULT_ADDR + ":" + port};
                server = new Astro(DEFAULT_PATH, address);
                server.start();
                if(server.isRunning()) {
                    logger.info("Astro running on " + port);
                } else {
                    logger.warning("Astro is not running");
                }
                while(server.isRunning()) {
                    try {
                        logger.info("Astro is running...");
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        executor.submit(runnable);
    }

    @Override
    public void contextDestroyed(ServletContextEvent sce) {
        server.shutdown();
        executor.shutdown();
    }

    private boolean isPortUsed(String hostName, int portNumber) {
        boolean result;
        try {
            Socket s = new Socket(hostName, portNumber);
            s.close();
            result = true;
        }
        catch(Exception e) {
            result = false;
        }
        return(result);
    }
}
This is the actual code that runs on every Servlet container I am testing now.
every time a new server is spawned it will run Copycat server
Jordan Halterman
@kuujo
Jul 18 2017 19:21
I looked at the code a bit. Are all these nodes then calling bootstrap with a single node in the configuration?
@kerbymart
@kerbymart
Jul 18 2017 19:21
Here's the start code:
    public void start() {
        if(server.isRunning()) {
            logger.info("Server running already");
        } else {
            logger.info("Server is not running!");
            server.bootstrap(members).join();
            server.cluster().join(members).join();
        }
    }
Jordan Halterman
@kuujo
Jul 18 2017 19:22
And are they all sharing the same log directory on the same physical node?
@kerbymart
@kerbymart
Jul 18 2017 19:22
right now in my local machine yes, but when deployed to the cloud servers they will certainly have different logs as they will be different machines
is this the reason?
Jordan Halterman
@kuujo
Jul 18 2017 19:25

Okay so, here are the restrictions on starting the cluster...

The cluster needs to be bootstrapped only one time either with a bootstrap() call on the first node or a bootstrap(members) call. This is the initial configuration and must be fixed. Additional nodes are added via join(someBootstrappedNodesThroughWhichToJoin)

All servers must either use a separate log directory or a separate unique server name, otherwise multiple servers using the same directory will read and write the same log files and you'll get crazy behavior

The two valid ways to start a cluster are:
bootstrap one node and join the rest to that node or
bootstrap all nodes
The cluster configuration mechanisms are a product of strong consistency requirements. The cluster needs a fixed view of the nodes in the cluster that cannot be eventually consistent. If all the nodes just attempted to join a cluster or form one if it didn't exist, we'd risk split brain since a partition can make a joining node think no cluster exists. That's why we have to separate initial nodes from added nodes
To say that it's safe to form a new cluster
@kerbymart
@kerbymart
Jul 18 2017 19:30
Okay so in our servlet configuration we need to make sure we have all the IP's of the member of the cluster, say in the properties file
Perhaps I should share our topology why I did put that kind of code in the Astro KV
image.png
Reason why I was thinking of having a single Servlet type is that we have a elastic server which we can add node or remove node dynamically using a dashboard, which also means that all these nodes basically have the same WAR footprint
@kerbymart
@kerbymart
Jul 18 2017 19:41
As for this "All servers must either use a separate log directory or a separate unique server name" -- yes in our case this will be the case, although I tested earlier for 1M records to put / get with 4 nodes running on the same machine with the same log path and it still worked without any problem.
Hi @kuujo when yo said "bootstrap all nodes" -- does it mean run all the servers at the same time or it means bootsrap in a way that the members / address are the same for all nodes that will run, since in a real-world scenario like in the screenshot I posted, when we restart Tomcat it restarts with 60 seconds delayed restart for others Tomcat nodes
Jordan Halterman
@kuujo
Jul 18 2017 19:45
Call bootstrap(members) on all the nodes at the "same" time. If I call bootstrap(members) with 3 nodes, then that will form a 3 node cluster, so the other two nodes have to do the same. For more dynamic clustering just bootstrap() one node and join() the others
In other words, when you call bootstrap you're creating a new cluster, so whichever nodes are in the cluster at that time need to be bootstrapped. But it's also valid to create a 1-node cluster and add nodes to it later
You might have gotten 4 1-node clusters that were only appending to the log and thus not causing any errors because the logs weren't being read. Calling bootstrap() on 4 nodes will likely do that
@kerbymart
@kerbymart
Jul 18 2017 19:48
So in our real-world case here, we have 4 Tomcat servers, which when Servlet runs it bootstraps Copycat, so the members that will be bootrapped are the IP's of the 4 Tomcat servers?
Jordan Halterman
@kuujo
Jul 18 2017 19:59
Yep
Jordan Halterman
@kuujo
Jul 18 2017 20:05
In that scenario, at least 3 of the nodes need to bebootstraped before any bootstrap call will return. During the bootstrap call, the configuration is set up, a leader is elected, and any existing logs will be replayed before it returns.
Wait that last point is only in Atomix 2
The cluster just needs to be able to elect a leader
@kerbymart
@kerbymart
Jul 18 2017 20:21
@kuujo can Copycat be transactional?
Jordan Halterman
@kuujo
Jul 18 2017 20:23
Sure, we use transactions. On a single cluster the wise way to do transactions would just be to do optimistic locking - read value versions using Raft log indexes and aggregate everything within a transaction in a single command, sending indexes for reads and validating that data has not changed between a read and a write and committing changes if not. In ONOS we use two phase commit to do cross-partition transactions.
@kerbymart
@kerbymart
Jul 18 2017 20:24
Okay so it means the Copycat library is not transactional at the moment
Jordan Halterman
@kuujo
Jul 18 2017 20:25
No. Copycat never will be. It’s just a library for building replicated state machines. Transactions are an implementation detail of state machines. But ONOS’s transactions will be in Atomix 2.0
err… I guess it’s a low level implementation of the Raft protocol. Transactions are too high level
doing efficient transactions requires understanding the implementation details of state machines
specifically, what constitutes a read-write conflict
or write-write conflict for that matter
we could do transactional writes, but that’s only half the problem of transactions
that’s just a matter of batching commands into a single command
but that’s more about making them atomic. The real problem with transactions is reads