These are chat archives for atomix/atomix

21st
Mar 2016
Joachim De Beule
@joachimdb
Mar 21 2016 13:40

Hi again. On closing a replica I get this in the logs (repeatedly):

2016-03-21 13:29:49.845 [Thread-0] INFO  eais.atomix - FSA component AtomixComponent - Calling final state
2016-03-21 13:29:49.845 [async-dispatch-16] INFO  eais.atomix - Node  10.10.3.11:8010  left group  metrics
2016-03-21 13:29:50.060 [copycat-server-server311.engagor.com/10.10.3.11:8020] INFO  i.a.catalyst.transport.NettyClient - Connecting to server301.engagor.com/10.10.3.1:8020
2016-03-21 13:29:50.068 [copycat-server-server311.engagor.com/10.10.3.11:8020] WARN  io.netty.channel.AbstractChannel - Force-closing a channel whose registration task was not accepted by an event loop: [id: 0x243183a7]
java.util.concurrent.RejectedExecutionException: event executor terminated
    at io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:715) ~[eais-standalone.jar:na]
    at io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:300) ~[eais-standalone.jar:na]
    at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:691) ~[eais-standalone.jar:na]
    at io.netty.channel.AbstractChannel$AbstractUnsafe.register(AbstractChannel.java:421) ~[eais-standalone.jar:na]
    at io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:60) [eais-standalone.jar:na]
    at io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:48) [eais-standalone.jar:na]
    at io.netty.channel.MultithreadEventLoopGroup.register(MultithreadEventLoopGroup.java:64) [eais-standalone.jar:na]
    at io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:315) [eais-standalone.jar:na]
    at io.netty.bootstrap.Bootstrap.doConnect(Bootstrap.java:134) [eais-standalone.jar:na]
    at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:116) [eais-standalone.jar:na]
    at io.atomix.catalyst.transport.NettyClient.connect(NettyClient.java:93) [eais-standalone.jar:na]
    at io.atomix.copycat.server.state.ConnectionManager.createConnection(ConnectionManager.java:69) [eais-standalone.jar:na]
    at io.atomix.copycat.server.state.ConnectionManager.getConnection(ConnectionManager.java:47) [eais-standalone.jar:na]
    at io.atomix.copycat.server.state.AbstractAppender.sendAppendRequest(AbstractAppender.java:242) [eais-standalone.jar:na]
    at io.atomix.copycat.server.state.LeaderAppender.sendAppendRequest(LeaderAppender.java:356) [eais-standalone.jar:na]
    at io.atomix.copycat.server.state.LeaderAppender.appendEntries(LeaderAppender.java:206) [eais-standalone.jar:na]
    at io.atomix.copycat.server.state.LeaderAppender.appendEntries(LeaderAppender.java:110) [eais-standalone.jar:na]
    at io.atomix.copycat.server.state.LeaderState.appendMembers(LeaderState.java:143) [eais-standalone.jar:na]
    at io.atomix.catalyst.util.concurrent.Runnables.lambda$logFailure$17(Runnables.java:20) ~[eais-standalone.jar:na]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_74]
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[na:1.8.0_74]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_74]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[na:1.8.0_74]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_74]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_74]
    at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_74]
2016-03-21 13:29:50.069 [copycat-server-server311.engagor.com/10.10.3.11:8020] ERROR i.n.u.c.D.rejectedExecution - Failed to submit a listener notification task. Event loop shut down?

Any ideas? (This is Atomix rc2)

The other replicas in the cluster are still running at that moment
(5 replicas 15 clients)
Chris Cleveland
@ccleve
Mar 21 2016 16:12

I need to handle a large number of data streams, possibly thousands. Streams consist of records. Here are the requirements:

  1. Each stream should get written to at least three boxes for redundancy.

  2. The order in which the records are written must be the same on each box, and if the master box for a stream goes down, then there needs to be a smooth transition to one of the replicas.

  3. I need to be able to control the placement of the streams on the boxes because they're not all the same. Some streams will be very active, generating a lot of data at a high rate, and some will be very quiet. We'll need to be able to move them around.

  4. Records need to get written in a transaction-safe manner. We can't lose records. This is a system that can afford some inefficiency and lower performance in exchange for safety. A two-phase commit would be ok.

  5. I have to be able to implement some kind of automated healing and repair so that if a box goes down, we can create and populate a replacement so we still have three copies of the data for each stream.

This sounds like it's tailor-made for Kafka, but Kafka doesn't do automatic healing, and it's a very heavy system with tons of dependencies. Also, it uses Zookeeper for metadata management, but not for the data streams themselves, so it's not fully transaction-safe.

Would it be possible to build such a system on top of Atomix?

Jordan Halterman
@kuujo
Mar 21 2016 16:47
Hey.. Very good question, and I have some answers for you...
Actually let me get back to my computer and answer there :-)
Chris Cleveland
@ccleve
Mar 21 2016 19:50
Eagerly awaiting your answers...
Jordan Halterman
@kuujo
Mar 21 2016 19:51
Agh sorry thanks for pinging me! Sidetracked by meetings
Jordan Halterman
@kuujo
Mar 21 2016 20:01
So, this is actually something I discuss quite frequently with people. It seems like this would be a great use case for a sort of sharded Raft algorithm wherein each shard is a unique instance of the algorithm with a unique leader through which writes can be committed, similar to how Kafka's partitions work. Indeed, this is something I've wanted to do for a long time, and Copycat's abstractions are intentionally designed to facilitate this. The reason it hasn't yet been implemented is because of challenges with coordinating across shards. We'd have to implement some sort of two phase commit to handle e.g. splitting or scaling shards. But, for use cases don't require any coordination across shards (which Atomx would for managing resource instances within the shards) it would be feasible to do sharding in Copycat simply by implementing a shared Transport and starting multiple CopycatServers. This is fairly straightforward to do, it's purely the resource creation and deletion in Atomix that is the reason for not having implemented it (having do account for failures while e.g. deleting a resource in all shards). But, since sharding through the Raft algorithm has not yet been implemented...
Chris Cleveland
@ccleve
Mar 21 2016 20:06
I was thinking this was the way to do it, to have one CopyCat server per shard. Trouble is, we might have dozens or hundreds of shards per server. Would this be a resource problem? If it requires one socket per CopyCat server, then it might be, because there would be all kinds of chatter.
Jordan Halterman
@kuujo
Mar 21 2016 20:10
What Atomix does have is the concept of PartitionGroups, and what this gives you is the abstraction necessary to build something like a Kafka. PartitionGroups are designed exactly like Kafka's partitions. You create a DistributedGroup and partitionit into a PartitionGroup. Each partition can consist of multiple members, and within each GroupPartition a unique leader is elected. Consistent hashing is used to hash partitions to nodes. When a node crashes, its petition will be reassigned and a GroupPartitionMigration is provided to the user to move data as necessary. DistributedGroup provides abstractions for direct messaging between members over the Transport or persistent medsaging through the Raft algorithm. The obvious problem with this is it's much more low level. It provides the ability to manage a cluster and partition data, but doesn't actually do the partitioning. That is left up to the user. But the reason DGroup has been designed that way is because future resources will be built on top of it. Rather than implementing sharding at the Raft level, resources will use a PartitionGroup to provide the option of e.g. a partitioned DistributedMap. But the problem with needing stronger guarantees than Kafka is that PartitionGroup doesn't quite get you all the way there. It's fairly easy to implement what Kafka does on top of it, but for stronger guarantees you may end up having to duplicate a lot of what Raft does in terms of ensuring order and things like that...
Something I have considered is a simpler implementation of sharding in Atomix where each resource key would map to a shard and it would be the user's responsibility to manage the sharding externally. For example, a user could create queue1, queue2, and queue3, each of which are mapped to a different instance of the Raft algorithm. This would be much easier to implement in Atomix and the complexities of managing resources across shards would be left to the user and perhaps implemented in some later version.
Jordan Halterman
@kuujo
Mar 21 2016 20:19
TBH I'm not really sure what the limit in terms of Raft shards would be. The problem is, I think once you get to that number of shards, there need to be optimizations that can only be made by a system that is designed for sharding. Copycat is designed for single instance use with the abstractions necessary to do sharding, but it's really naive sharding. Other sharded implementations of the Raft algorithm have optimizations to do things like share heartbeats across shards. If 100 Copycat servers shared the same Transport, 100 heartbeats would be sent each heartbeat interval. Similarly, client sessions have to be managed for each share. I don't think any of the current sharded implementations do sessions, and this is where you start to run into the issues with coordinating across shards and start needing something like 2pc again. If a client is communicating with n shards then it needs to register n sessions and send nkeep alives. This would require a bit more work to perhaps create a ShardedClient and ShardedServer that can split a single keep alive from the client into one per shard.
Alternatively, you could register the client's session with one master Raft instance and use 2pc to notify all the shards. But that introduces lots of other failure scenarios, and it's why this has been generally avoided. We've chosen instead to focus on stability while slowly moving that way, and the DGroup abstractions are a way of moving that way in Atomix. I'm not sure this will ever be fully implemented at the Copycat level. It's more likely something similar to what Kafka does will be implemented in Atomix itself at least at first.
Jordan Halterman
@kuujo
Mar 21 2016 20:25
I think sessions in particular pose challenges for sharding in Raft. It's likely feasible with a few shards, but probably not with hundreds of shards.
At least in Copycat
Chris Cleveland
@ccleve
Mar 21 2016 20:45
Thanks. This answer has saved me a ton of time. Now I know what issues I need to think about. Will keep poking around for a solution.
Jordan Halterman
@kuujo
Mar 21 2016 21:32
No problem good luck!
Richard Pijnenburg
@electrical
Mar 21 2016 21:57
@kuujo how you doing bud?
Jordan Halterman
@kuujo
Mar 21 2016 22:40
Great. Busy.
Richard Pijnenburg
@electrical
Mar 21 2016 22:41
Hehe I can imagine. How's the new position ?
Jordan Halterman
@kuujo
Mar 21 2016 22:44
Great. I'm like an expert on meetings now or something
Actually has just been a lot of planning but now starting to build stuff finally. First time I spent all weekend working on work stuff rather than open source projects
in a while at least
It's really pretty awesome though
I've tricked people into putting me in a position where I get to decide to build cool shit :-)
Jean-François Im
@jfim
Mar 21 2016 22:53
haha awesome place to be
Richard Pijnenburg
@electrical
Mar 21 2016 22:54
Haha great to hear indeed
I'm in my 3rd week now at my new job. Found out they actually hired me with the intention to make changes. They will be in for some surprises lol
Jean-François Im
@jfim
Mar 21 2016 22:56
Just rewrite everything in clojure/erlang/haskell
job security right there :P
Richard Pijnenburg
@electrical
Mar 21 2016 22:57
Hahaha
Some guys at my old job are/were doing that. They should be fired :)
Jean-François Im
@jfim
Mar 21 2016 22:57
but they can't now, that's the whole point :)
Richard Pijnenburg
@electrical
Mar 21 2016 22:58
Haha. Stuff they built so far can be replaced. They love reinventing the wheel.
I'm just happy I'm gone from that place :)
Jean-François Im
@jfim
Mar 21 2016 22:59
cool :)
Jordan Halterman
@kuujo
Mar 21 2016 23:27
Lol omg
The world would be a lot different if it was run by engineers
Full of half built buildings
Ambitious ones though
With ten elevators that all take you to the same place but none of them travel in a straight line
And no signs to guide you
Or only signs that are old and irrelevant
And misguiding
Richard Pijnenburg
@electrical
Mar 21 2016 23:31
Hahaha so true :)
Time for me to get some sleep. Take care guys.