These are chat archives for atomix/atomix

2nd
Apr 2016
Jordan Halterman
@kuujo
Apr 02 2016 03:46
The way to send an atomic broadcast to a group will be something like this:
MessageProducer.Options options = new MessageProducer.Options()
  .withDelivery(Delivery.ATOMIC_BROADCAST);

MessageProducer<String> producer = group.messages().producer(“foo”);
producer.send(“Hello world!”).thenRun(() -> {
  // All members acknowledged their messages
});
Jordan Halterman
@kuujo
Apr 02 2016 03:54
ATOMIC_RANDOM
ATOMIC_ROUND_ROBIN
ATOMIC_BROADCAST
etc
Grant Haywood
@cinterloper
Apr 02 2016 05:18
oh thats pretty cool, i actually need ATOMIC_RANDOM, for asking a subset of peers to perform an external replication task
Jordan Halterman
@kuujo
Apr 02 2016 05:46
:+1: should have it merged tonight I think
Jordan Halterman
@kuujo
Apr 02 2016 10:05

Send a message to a random member of a group, awaiting acknowledgement of the message

MessageProducer.Options options = new MessageProducer.Options()
  .withConsistency(MessageProducer.Consistency.ATOMIC)
  .withDispatchPolicy(MessageProducer.DispatchPolicy.RANDOM);

MessageProducer<String> producer = group.messages().producer(“foo”, options);
producer.send(“Hello world!”).thenRun(() -> {
  // A random member acknowledged the message
});

Broadcast a message to all members of a group, awaiting persistence of the message

MessageProducer.Options options = new MessageProducer.Options()
  .withConsistency(MessageProducer.Consistency.SEQUENTIAL)
  .withDispatchPolicy(MessageProducer.DispatchPolicy.BROADCAST);

MessageProducer<String> producer = group.messages().producer(“foo”, options);
producer.send(“Hello world!”).thenRun(() -> {
  // The message will eventually be received by all members of the group.
});

Send a direct message to a member of a group, awaiting acknowledgement of the message

MessageProducer.Options options = new MessageProducer.Options()
  .withConsistency(MessageProducer.Consistency.ATOMIC);

Member member = group.member(“foo”);
MessageProducer<String> producer = member.messages().producer(“foo”, options);
producer.send(“Hello world!”).thenRun(() -> {
  // Member “foo” acknowledged the message
});

Send a direct message to a member of a group, awaiting persistence of the message

MessageProducer.Options options = new MessageProducer.Options()
  .withConsistency(MessageProducer.Consistency.SEQUENTIAL);

Member member = group.member(“foo”);
MessageProducer<String> producer = member.messages().producer(“foo”, options);
producer.send(“Hello world!”).thenRun(() -> {
  // Member “foo” will eventually receive the message
});
Richard Pijnenburg
@electrical
Apr 02 2016 10:06
Hiya @kuujo
Jordan Halterman
@kuujo
Apr 02 2016 10:09
indeed
Richard Pijnenburg
@electrical
Apr 02 2016 10:10
missing the dispatch settings in the last 2 examples
??
Jordan Halterman
@kuujo
Apr 02 2016 10:11
not required
Richard Pijnenburg
@electrical
Apr 02 2016 10:11
ah okay :-)
Jordan Halterman
@kuujo
Apr 02 2016 10:11
those are direct messages so that option doesn’t make sense
Richard Pijnenburg
@electrical
Apr 02 2016 10:11
i see
Jordan Halterman
@kuujo
Apr 02 2016 10:12
ugh gitter’s markdown won’t let me create line breaks
Richard Pijnenburg
@electrical
Apr 02 2016 10:12
lol
Jordan Halterman
@kuujo
Apr 02 2016 10:14
I think I need to change Consistency to something else with SYNC and ASYNC
technically everything is atomic
it’s Atomix after all
Richard Pijnenburg
@electrical
Apr 02 2016 10:15
yeah indeed
Jordan Halterman
@kuujo
Apr 02 2016 10:15
naming is the hardest part of programming
Jordan Halterman
@kuujo
Apr 02 2016 10:34
Guess I'll have to fight the naming battles in the morning
Richard Pijnenburg
@electrical
Apr 02 2016 11:08
haha yeah. Talk to you later bud.
Richard Pijnenburg
@electrical
Apr 02 2016 17:55
Hmm scheduledexecutor service i can use for tasks that have to run at intervals? like for metric collection?
Jordan Halterman
@kuujo
Apr 02 2016 20:30
Sure. It's the same as a regular Executor, but the callback is being associated with a time interval. It goes on a queue and when that time comes is pulled off and run in the thread pool and then stuck back on the queue if it's scheduled at a fixed rate.
Richard Pijnenburg
@electrical
Apr 02 2016 20:40
Ahh okay, i see
Jordan Halterman
@kuujo
Apr 02 2016 20:41
Just gotta make sure what you're doing is thread safe :-)
Richard Pijnenburg
@electrical
Apr 02 2016 20:42
haha yeah
Jordan Halterman
@kuujo
Apr 02 2016 20:42
synchronized and volatile
Richard Pijnenburg
@electrical
Apr 02 2016 20:57
With the executors. i guess i would need one to push an event to it and one to get the value and send it to the next plugin ?
Jordan Halterman
@kuujo
Apr 02 2016 21:00
Each component can have its own executor or they can all share an executor and a thread pool. When one component is done doing whatever it's doing with its input, it uses the next component's executor to send the output to the correct thread and run the next component. Alternatively, the component just calls the next component and the next component uses its own executor to start doing stuff on another thread
Richard Pijnenburg
@electrical
Apr 02 2016 21:01
Hmm. not sure i completely follow
Jordan Halterman
@kuujo
Apr 02 2016 21:41
An Executor is just a queue. The first component does something and puts its output on the second component's queue. The second component does something and puts its output on the third component's queue. 1->2->3. Each -> is a separate Executor associated with the next component
Richard Pijnenburg
@electrical
Apr 02 2016 21:43
okay. think i understand. the advantage of usign this over normal queu’s is that i don’t have to keep certain things running in a loop like the filters? they just get executed whenever something is in the executor queue ?
Jordan Halterman
@kuujo
Apr 02 2016 21:51
they’re likely much faster than any queueing you can do yourself
Richard Pijnenburg
@electrical
Apr 02 2016 21:51
I see, okay
so the big decision i have to make is how to handle the pooling i guess. On how to pool the threads. do it per plugin or per group like all filters share the same thread pool
Jordan Halterman
@kuujo
Apr 02 2016 21:52
and it’s an abstraction
Richard Pijnenburg
@electrical
Apr 02 2016 21:57
i guess datalos might occure if there are things in the queue and it crashes ?
Jordan Halterman
@kuujo
Apr 02 2016 21:58
you can configure how to handle exceptions
Richard Pijnenburg
@electrical
Apr 02 2016 21:58
Because of that i want to use the raft log for storing the original event and being able to replay it through the pipeline.
Hmm yeah.
but if it OOM’s then its lost by definition
or if a plugin creates an exception.
Jordan Halterman
@kuujo
Apr 02 2016 22:01
That’s the case in any system. Data that can’t be lost has to be persisted somewhere. The easiest and most performant thing to do is to be able to replay it from its source. There always has to be some sort of acknowledgement mechanism. Even if data is being pushed to the pipeline, whatever is pushing it needs acknowledgement that the data was received before it can assume it’s processed. You don’t necessarily have to persist anything internally anywhere. Just use acknowledgements to say whether something was or was not processed and therefore can or cannot be safely removed from some existing persistence.
Richard Pijnenburg
@electrical
Apr 02 2016 22:02
true. but sometimes its not possible. Like syslog messages, those just push messages.
Jordan Halterman
@kuujo
Apr 02 2016 22:02
presumably, they don’t just push messages and then hope that they made it though, right?
because if they do then the system’s already not fault tolerant
regardless of what you do when the data gets to the pipeline
Richard Pijnenburg
@electrical
Apr 02 2016 22:03
I’m afraid with logging the sending system doesn’t always care about what we think it should do :-)
Also persisting it in the raft log and only evicting it if our end destination has received it allows us to replay it if there is a config failure
for example bad grok filter
can build a DLQ or something like that
Also want to build statistics in there as well
Jordan Halterman
@kuujo
Apr 02 2016 22:10
Well… there’s not much reason to make data fault tolerant once it gets into the system if it’s never fault tolerant before it gets into the system. Ultimately, the system’s only as fault tolerant as its least fault tolerant piece. The problem is, if the thing that is pushing data into the system doesn’t care about feedback from the system - even if that feedback is just an immediate response to the push - then there’s no way to say that anything was successful because you don’t know what you don’t know. In other words, if some data is lost on the way from the source to the pipeline, it’s just lost. If the system that’s pushing data to the pipeline can handle a request and response (ack) then fault tolerance can be acheieved. But without that, even if data arrives at the system it’s not immediately made fault tolerant until it’s replicated.
Richard Pijnenburg
@electrical
Apr 02 2016 22:11
very true. a lot of inputs will do some kind of acking at least and i want to make sure at least this piece won’t lose messages
Jordan Halterman
@kuujo
Apr 02 2016 22:13
Well… if an input can do acks then data doesn’t necessarily need to be persisted. The fastest way to make the system fault tolerant will always be to support replay from the data’s source through acks. If the pipeline is cheap (takes on the order of milliseconds to complete processing of one item) then fault tolerance within the pipeline will only add significant expense to that process
Richard Pijnenburg
@electrical
Apr 02 2016 22:14
Hmm true
Jordan Halterman
@kuujo
Apr 02 2016 22:15
Atomix can be used to do the fault tolerance if it’s necessary, but for scalability everything can’t go through Raft. Consensus is designed for decision making rather than general fault tolerance. So, the idea is that you help make decisions about where data goes for fault tolerance. For example, if some system is pushing to a pipeline, Raft is used to say where that data goes. In the event of a failure, Raft is used to consistently decide what changes to the flow of data need to be made. Linearizability ensures that the entire cluster sees changes to the structure of the cluster at the same logical time so no data is lost. But for scalability, multiple inputs have to push to multiple nodes, and those multiple nodes need to replicate to multiple replicas independently of one another. Raft forces everything through a single node which severely limits performance.
It could be done efficiently in Raft with sharding though, which I’m really interested in
Richard Pijnenburg
@electrical
Apr 02 2016 22:17
If i only use it for storing the original event and delete it once its sent of. would that be efficient enough?
Richard Pijnenburg
@electrical
Apr 02 2016 22:22
Hmm. and also haven’t solved the worker queue / transport between nodes.
Jordan Halterman
@kuujo
Apr 02 2016 22:23

The problem is, it would limit the scalability of an entire cluster - even if it’s 1000 nodes - to something like 10k/sec. Every time you store an event, it gets sent to the leader and replicated to n followers. Every time you delete an event it gets sent to the leader and replicated to followers. In the same 1000 node cluster, you could get probably hundreds of times that performance by using Raft to coordinate the cluster to replicate events rather than pushing them through Raft itself.

// Create a group for each pipeline
DistributedGroup group = atomix.getGroup(“cluster”).get();

// Join the group
LocalMember localMember = group.join().get();

// Elect a leader for the group
group.election().onElection(term -> {
  Member leader = term.leader();
  // If the local node is the leader, start a server to listen for connections
  if (localMember.equals(leader)) {
    Server server = atomix.transport().server();
    server.listen(new Address(“localhost”, 1234), connection -> {
      connection.onMessage(message -> {
        // When a message is received, replicate it to all the other members of the group
        group.members().forEach(member -> {
          // Replicate the message
        });
        // Once the message has been replicated, respond to the original message to ack it
      });
    });
  }
});

This is the scalable way to do fault tolerance. The cluster is partitioned into multiple groups. Each group represents a single partition of a pipeline. When data is pushed to the pipeline, it’s pushed to the leader. When the leader receives the data, it replicates it to the rest of the members of the group and acks the message. Then it continues on to do its thing (send the data through the pipeline)

In other words, whereas Raft has a single leader, you create linear scalability by creating a bunch of groups within the cluster and electing a leader for each group through Raft. Each group represents a set of replicas. Data sent to the group is sent to the leader, and the leader replicates to the other members of the group for fault tolerance then continues on processing. Once data is processed, the leader sends another message to other members of the group telling them it’s safe to drop the data.
Richard Pijnenburg
@electrical
Apr 02 2016 22:27
The clients don’t store state right? do deletes and stuff still get replicated to them?
Jordan Halterman
@kuujo
Apr 02 2016 22:27
The only way to create true scalability is through this type of partitioning. Effectively, you’re using Raft to do the hard parts of creating a bunch of smaller groups in the cluster that use a replication pattern very similar to Raft
yeah clients can store state if it’s external to the group
this is how the Orwell project was supposed to be done
just need to find the time to do it
maybe I should throw that together
that would be a thorough demo of scalable replication using DGroup
Richard Pijnenburg
@electrical
Apr 02 2016 22:29
i mean if its just a client, not a replica.. only replica’s have state right?
Jordan Halterman
@kuujo
Apr 02 2016 22:30
Replicas have Raft related state. AtomixClient can still use DistributedGroup just like a replica though. So, you could do all of the above on a client too. Client’s just interact with the group remotely, but that’s transparent to the user
if that makes sense
Richard Pijnenburg
@electrical
Apr 02 2016 22:30
Ahh yeah okay
Jordan Halterman
@kuujo
Apr 02 2016 22:30
sort of different levels of state I suppose
there’s the Raft state, but in this case I’m talking about storing state at a higher level than that, so whether it’s a client or replica doesn’t really impact it
Richard Pijnenburg
@electrical
Apr 02 2016 22:31
but if store data, then only the replica’s have to store it.. so i can have a 3 or 5 node masters ( replica’s ) and multiple clients.
Richard Pijnenburg
@electrical
Apr 02 2016 22:44
So storing events only when there is an exception would be wiser. and then provide some way of managing those entries
having built in recovery would be important since i can’t assume that plugins will handle that correctly.
Jordan Halterman
@kuujo
Apr 02 2016 22:56
Yeah... It's complicated dealing with lots of disparate systems with varying features
Richard Pijnenburg
@electrical
Apr 02 2016 22:56
hehe yeah
I also want to make sure the whole system is plugable for people. so they can build their own plugins