These are chat archives for atomix/atomix

14th
Mar 2016
Jordan Halterman
@kuujo
Mar 14 2016 02:00
weird… I just realized Copycat rc3 never made it to Maven Central. :-( Going to be pushing another release tonight
it was, like, the middle of the night when I did that. Maybe I pressed drop instead of release :-P
Jordan Halterman
@kuujo
Mar 14 2016 02:52
k I just released Copycat rc4 now :-) Hopefully that one makes it
Richard Pijnenburg
@electrical
Mar 14 2016 18:01
@kuujo did something break in atomix-rc3?
Caused by: java.lang.NullPointerException: members cannot be null
    at io.atomix.catalyst.util.Assert.notNull(Assert.java:60)
    at io.atomix.copycat.server.storage.system.Configuration.<init>(Configuration.java:42)
    at io.atomix.copycat.server.storage.system.MetaStore.loadConfiguration(MetaStore.java:138)
    at io.atomix.copycat.server.state.ClusterState.<init>(ClusterState.java:71)
    at io.atomix.copycat.server.state.ServerContext.<init>(ServerContext.java:102)
    at io.atomix.copycat.server.CopycatServer$Builder.build(CopycatServer.java:971)
    at io.atomix.AtomixReplica$Builder.buildServer(AtomixReplica.java:901)
    at io.atomix.AtomixReplica$Builder.build(AtomixReplica.java:937)
worked fine with atomix-rc2
I know @jhalterman changed some things as well. perhaps i missed something that i need to change :-)
Jordan Halterman
@kuujo
Mar 14 2016 18:43
Lemme poke around. Nothing should have changed there...
According to the stack trace, it's trying to load the configuration (List of members) but it's null. Seems like it's reading a meta file from disk.
Which is where the configuration is stored
Richard Pijnenburg
@electrical
Mar 14 2016 19:05
Hmm. I'm pretty sure I removed everything but will check. At a pub now :)
@kuujo you were right
seems it didn’t like to carry over the data from rc2 to rc3
Richard Pijnenburg
@electrical
Mar 14 2016 19:11
did something change in the formatting of the storage file ?
Jordan Halterman
@kuujo
Mar 14 2016 19:15
I was trying to figure that out but got sidetracked :-P
Richard Pijnenburg
@electrical
Mar 14 2016 19:16
hehe okay :p
Jordan Halterman
@kuujo
Mar 14 2016 19:18
Hmm I don't see anything that should have caused that.
Ahh I think I know
Added a timestamp to configurations that's rifht
This is what sucks about managing databases. Gonna have to start providing upgrade scripts at some point
Richard Pijnenburg
@electrical
Mar 14 2016 19:19
hehe yeah
i thought there was always a timestamp in the log?
Jordan Halterman
@kuujo
Mar 14 2016 19:20
When binary changes
That was one added to configuration files. The meta file stores only the term, vote, and configuration. Just added timestamp to that took
Too*
Richard Pijnenburg
@electrical
Mar 14 2016 19:21
Ah i see. okay
Hmm. will have to build a class or something so i can build a settings tree or something and pass it on.. or something.
Trying to wrap my head around how to pass on settings from atomix to an other thread that handles the pipelines
Still unsure anyway how to build up the pipeline. lol
especially with having multiple threads doing the same thing
Jordan Halterman
@kuujo
Mar 14 2016 19:25
You should use the Executor framework, much easier than straight threads. It does the queuing and stuff for you. That's what Catalyst's ThreadContext wraps
Richard Pijnenburg
@electrical
Mar 14 2016 19:25
Ah okay. I expected you to have some great idea ;-)
Jordan Halterman
@kuujo
Mar 14 2016 19:26
executor.execute(() -> in some other thread)
Richard Pijnenburg
@electrical
Mar 14 2016 19:27
oh handy :-)
then i just have to string a few of those together for each thing
Jordan Halterman
@kuujo
Mar 14 2016 19:27
Copycat uses a bunch of single threaded executors as event loops. Or you can use a thread pool
Richard Pijnenburg
@electrical
Mar 14 2016 19:28
it defo has to be multi threaded. for example i want to say ‘this filter or filters have x threads/core to run on'
Jordan Halterman
@kuujo
Mar 14 2016 19:29
That should be easy. Executor per filter. Executor executor = Executors.newThreadPool(numThreads) or whatever
Richard Pijnenburg
@electrical
Mar 14 2016 19:29
okay :-)
and what if i want to say. the filters for this pipeline ( which is meant to be separate anyway ) are sharing x threads? would be a thread group or something?
Jordan Halterman
@kuujo
Mar 14 2016 19:30
Ahh... So, Catalyst does something like this...
Basically, you create a single thread pool executor and for each filter create a different ThreadPoolContext. That context has a queue that's separate from the executor queue. Basically, it allows multiple contexts to share the same thread pool, but in Copycat/Atomix case for each context to only execute on one thread at a time
Richard Pijnenburg
@electrical
Mar 14 2016 19:33
ahh okay. i see
Jordan Halterman
@kuujo
Mar 14 2016 19:33
But if they don't need that requirement then just share the same executor between them
Richard Pijnenburg
@electrical
Mar 14 2016 19:34
the whole pipeline part should share the same queue yeah
Jordan Halterman
@kuujo
Mar 14 2016 19:34
Copycat/Atomix just make everything single threaded at some point since things have to be ordered in a specific way, and the easiest way to do that is a single thread
Richard Pijnenburg
@electrical
Mar 14 2016 19:35
what i eventually also want to do is like parallel filtering. for example if i have 3 filters. but the result of each individual filter has no impact on the input of the next filter. would need to do some weird queueing then i guess.
maybe for a future version lol
yeah. there are only a few things in logstash that would need to be single threaded like multi line crap
Jordan Halterman
@kuujo
Mar 14 2016 19:36
It's still multi threaded, it just carefully coordinates those threads. Things happen in order on one thread and then happen in the same order on another thread. For example, it writes 5 entries to the log on one thread, replicates them in that order on another thread, then applies them to the state machine in that order on another thread. The Executor handles the ordering and the code is just deterministic
I deed
Indeed
Richard Pijnenburg
@electrical
Mar 14 2016 19:37
also would need to build something that allows me to flush the executor threads so i can rebuild the pipeline if they change the config
and maybe pauze some threads so they stop processing
all the plugins will have to implement the same interface like initialize ( setup the filter ) shutdown ( flush it ) and run ( run the filter action )
would be nice if i can initialize all of them once and then do the run command multiple times
some init functions can be expensive. like reading the database file of geoip filter
Richard Pijnenburg
@electrical
Mar 14 2016 19:43
Would be nice if i can share the resource of that file across the threads as well
ah. the executor stuff has a shutdown part built in
just have to make sure it doesn’t try to process any new things
Jordan Halterman
@kuujo
Mar 14 2016 19:45
I never did totally elaborate on the faster way to do reliable messaging. There are a couple ways this can be done. First is to just use the MEMORY storage level, in which case writing to a 3-node cluster requires a single batch request to a second node for replication. But the other way this will be done in Atomix post-1.0 is by wrapping DistributedGroup. Essentially, the problem with going through Raft is its strict requirements for consistency. Writes have to be synchronously replicated to and persisted on a majority of the cluster. The other issue is that Atomix does not support sharding. But DGroup was developed to resolve this issue...
DGroup now has a partitioning API that aids in partitioning members of the cluster, routing requests to specific nodes, and replicating data. What that means for scaling is you partition the source data into n partitions and create a PartitionGroup. Instead of pushing writes through the Raft cluster, you split the data and write each partition to a GroupPartition which represents some subset of the members in the group. The first member in the group is the primary replica, and the rest are backups for fault tolerance.
Atomix' DGroup does consistent hashing internally to map partitions to members. So, if a node crashes, some other node will take over the partitions assigned to that node.
TBH this should really be implemented in Atomix itself. We should create a fast messaging resource or a version of DistributedTaskQueue that partitions data
Richard Pijnenburg
@electrical
Mar 14 2016 19:49
Ah think i understand it.. so if i have 3 nodes, i’ll have 3 partitions. and each group can have an other primary node?
Jordan Halterman
@kuujo
Mar 14 2016 19:50
That's ultimately the plan anyways
Indeed
So, if you have 3 nodes and 3 partitions, one partition might go to node A and B, one to B and C, and one to C and A
Richard Pijnenburg
@electrical
Mar 14 2016 19:50
would that in theory increase the throughput? because eventually all 3 nodes will have to write the data. no matter which one is primary also the replicas will do a write
Jordan Halterman
@kuujo
Mar 14 2016 19:51
If node A crashes, Atomix will reassign partitions so they're all nodes B and C
That's how you scale "linearly"
Richard Pijnenburg
@electrical
Mar 14 2016 19:51
i see
Jordan Halterman
@kuujo
Mar 14 2016 19:52
You can have a cluster much larger than three nodes. You could have a 6 node cluster where none of the partitions overlap. And even in a three node cluster, writes between partitions are concurrent, so there's still significant performance benefits
This is exactly how topics in Kafka work
Richard Pijnenburg
@electrical
Mar 14 2016 19:52
since most filters are cpu bound and not disk bound. would it make sense to have each node a replica node?
the only issue i can imagine is managing all those partitions.
Jordan Halterman
@kuujo
Mar 14 2016 19:56
Yeah that's the PITA, vut that can be done inside Atomix as that's already the plan anyways. The hard part is when the cluster changes (a node crashes) you really need to replicate state from the remaining not live replica to a new backup. But Copycat/Atomix are close to being able to do that internally now.
Richard Pijnenburg
@electrical
Mar 14 2016 19:57
i see. okay
Jordan Halterman
@kuujo
Mar 14 2016 19:57
Ugh phone
Richard Pijnenburg
@electrical
Mar 14 2016 19:58
I also still have the idea of using raft for a consistency store. so only store the original event instead of using it for a transport. that if the pipeline fails or what ever. i still have the original event and can replay it
and use something else for buffering / transport
Jordan Halterman
@kuujo
Mar 14 2016 19:59
Hmm... That seems like possibly a good idea
Richard Pijnenburg
@electrical
Mar 14 2016 20:00
using a multi threaded send/receive part with acking from one buffer to an other.
with batching most likely
Jordan Halterman
@kuujo
Mar 14 2016 20:01
Bah gotta do some work
Richard Pijnenburg
@electrical
Mar 14 2016 20:01
np bud. thank you for thinking with me :-)
Jordan Halterman
@kuujo
Mar 14 2016 20:01
Later
Richard Pijnenburg
@electrical
Mar 14 2016 22:03
@kuujo for when you have time. what do you think of https://github.com/OpenHFT/Chronicle-Queue ?
Jordan Halterman
@kuujo
Mar 14 2016 22:44
@electrical that library is for interprocess communication. It does apparently do replication, but it wouldn't be fault tolerant and it doesn't seem to even elaborate on what sort of guarantees you could assume. For example, if you write a message to a queue on one node and then that node crashes, can you expect the other node to have that message? Also, this might scale up, but not out. You still have to do some sort of partitioning to achieve real scalability, so it doesn't necessarily get you a lot in that sense.
Basically, it does messaging using shared off heap memory
Richard Pijnenburg
@electrical
Mar 14 2016 22:50
ah okay. guess i understood the concepts wrong of it :-)
i thought it could be used at least for a fast queue with separate process for inter node communication
and the idea was to have a queue per pipeline part anyway on each node
localized queues
not a central one]
but that might make things harder as well
Jordan Halterman
@kuujo
Mar 14 2016 23:15
well that is how you scale
that’s what I mean by partitioning
completely separate queues
but the challenge is splitting data in a meaningful way
records relate to each other, and it usually depends on the data
for example, if you have an event log of user events you often have to send the same events for the same user to the same queue
the goal being to maintain order where it’s relevant
if you split data, you have to ask whether it’s okay for everything to be written out of order to whatever the end destination is
and if not how you retain order where it matters
Richard Pijnenburg
@electrical
Mar 14 2016 23:37
sorry. was in a game of Starcraft 2 :p
Yeah. in my case it doesn’t really matter to much the sequence. only in a very few select cases like the multiline filter
but the rest shouldn’t matter to much.
would love to use raft for it with a fast task queue system.
saves using something else
and don’t have to worry about data being on the compute nodes
been reading up a lot on the thread executors on my way home
Jordan Halterman
@kuujo
Mar 14 2016 23:41
indeed
I am actually going to create an example of how to use DGroup for something like that
pretty simple
Richard Pijnenburg
@electrical
Mar 14 2016 23:42
ill have to make a small implementation but do you think its possible to have like an init function that has some shared data across workers ? for example an object pointer of the geoipdatabase library?
Jordan Halterman
@kuujo
Mar 14 2016 23:43
DistributedGroup group = atomix.getGroup(“foo”).get();
PartitionGroup partitions = group.partition(32, new RoundRobinPartitioner());
for (String line : theData) {
  GroupPartition partition = partitions.get(line);
  partition.members().forEach(m -> m.connection().send(line));
}
indeed