These are chat archives for atomix/atomix

15th
Aug 2018
william.z
@zwillim
Aug 15 2018 08:00

I got this error when start 3 servers with raft partition groups

io.atomix.storage.StorageException: Failed to acquire storage lock; ensure each Raft server is configured with a distinct storage directory

But I set the storageleval with MEMORY like this:

        builder.withMemberId(server).withAddress("127.0.0.1", port)
                .withMembershipProvider(new BootstrapDiscoveryProvider(
                        Node.builder().withId("server1").withAddress("127.0.0.1:5001").build(),
                        Node.builder().withId("server2").withAddress("127.0.0.1:5002").build(),
                        Node.builder().withId("server3").withAddress("127.0.0.1:5003").build()))
                .withManagementGroup(RaftPartitionGroup.builder("systen")
                        .withNumPartitions(1).withMembers("server1", "server2", "server3")
                        .withStorageLevel(StorageLevel.MEMORY).build())
                .addPartitionGroup(RaftPartitionGroup.builder("data1")
                        .withNumPartitions(1).withMembers("server1", "server2", "server3")
                        .withStorageLevel(StorageLevel.MEMORY).build());

I'm confused.

MiniDigger
@MiniDigger
Aug 15 2018 08:20
hey guys, do you happen to work on or happen to know an open source project that uses atomix 3? I want to learn more about how its used so that I design my system properly
Jordan Halterman
@kuujo
Aug 15 2018 08:21
Sure… Atomix is primarily developed for ONOS and it’s likely the most thorough user of Atomix:
https://onosproject.org/
https://github.com/opennetworkinglab/onos
I’m also the architect for distributed systems in ONOS, so ask me anything about how Atomix is used there and I can answer
Junbo Ruan
@aruanruan
Aug 15 2018 08:23
@kuujo did you find the question the raft protocol error for long time running?
Jordan Halterman
@kuujo
Aug 15 2018 08:24
nope
@zwillim even when StorageLevel.MEMORY is used, some files are still needed to e.g. safely track votes, configuration, etc
@aruanruan what are you referring to?
Junbo Ruan
@aruanruan
Aug 15 2018 08:26
when i stress testing for long time, the cluster will become unconnected
it report 'RAFT protocol' error
14:30:01.264 [raft-server-system-partition-2] TRACE i.a.p.raft.roles.LeaderAppender - RaftServer{system-partition-2} - Received AppendResponse{status=OK, term=2, succeeded=true, lastLogIndex=4237} from node2
14:30:01.264 [raft-server-system-partition-2] TRACE i.a.p.raft.roles.LeaderAppender - RaftServer{system-partition-2} - Committed entries up to 4237
14:30:01.271 [netty-messaging-event-epoll-client-103] TRACE i.a.c.m.impl.NettyMessagingService - cloudate1:8000 - Received message type atomix-cluster-heartbeat from 10.78.10.219:5680
14:30:01.271 [atomix-bootstrap-heartbeat-receiver] TRACE i.a.c.d.BootstrapDiscoveryProvider - cloudate1:8000 - Received heartbeat: cloudate1:8000
14:30:01.273 [raft-server-system-partition-2] TRACE i.a.p.raft.roles.LeaderAppender - RaftServer{system-partition-2} - Received AppendResponse{status=OK, term=2, succeeded=true, lastLogIndex=4237} from node3
14:30:01.273 [raft-server-system-partition-2] ERROR i.a.p.raft.impl.RaftServiceManager - RaftServer{system-partition-2} - Cannot apply index 4237 (hasNext: true, nextIndex: 172)
14:30:01.273 [raft-server-system-partition-2] TRACE i.a.protocols.raft.roles.LeaderRole - RaftServer{system-partition-2}{role=LEADER} - Sending OpenSessionResponse{status=ERROR, error=RaftError{type=PROTOCOL_ERROR, message=Cannot apply index 4237}}
14:30:01.274 [raft-client-system-partition-2-5] TRACE i.a.p.r.s.impl.RaftSessionConnection - RaftClient{system-partition-2} - Received OpenSessionResponse{status=ERROR, error=RaftError{type=PROTOCOL_ERROR, message=Cannot apply index 4237}} from node1
14:30:01.283 [netty-messaging-event-epoll-client-109] TRACE i.a.c.m.impl.NettyMessagingService - cloudate1:8000 - Received message type atomix-cluster-metadata from 10.78.10.219:5680
Jordan Halterman
@kuujo
Aug 15 2018 08:28
what constitutes a stress test?
Junbo Ruan
@aruanruan
Aug 15 2018 08:29
just write lots data for loop
Jordan Halterman
@kuujo
Aug 15 2018 08:29
that doesn’t look like a very long time
I’ve been trying to reproduce this bug
can you put your code in a gist?
I need the trace logs
Junbo Ruan
@aruanruan
Aug 15 2018 08:29
yes, it not too long
Jordan Halterman
@kuujo
Aug 15 2018 08:29
awesome
@MiniDigger 👆
Junbo Ruan
@aruanruan
Aug 15 2018 08:31
it seems like one node can not capture the raft progross or index roll over
william.z
@zwillim
Aug 15 2018 08:32
@kuujo Ok, thx
Jordan Halterman
@kuujo
Aug 15 2018 08:33
well, it looks like some sort of race when applying entries from the log
Junbo Ruan
@aruanruan
Aug 15 2018 08:34
Today i built a new atomix cluster in 1Gbits bandwidth, it still happened
MiniDigger
@MiniDigger
Aug 15 2018 08:34
ok, I have a simple architectural question. In my head my current setup looks like this: I have a group of manager nodes which coordinate tasks which are executed by worker nodes. the managers need to be consistent to each other, one task should only be executed by one worker, if that worker goes down/becomes unreachable another worker should pick
it up. it seems like the workqueue is what I want for that. but how would I layout the overall cluster? I want raft between the managers, but what would I use for the workers? primary backups? is the Consistent Data-Grid architecture what I want or the Raft Client-Server (going from https://atomix.io/docs/latest/user-manual/introduction/architectur
es/)
Junbo Ruan
@aruanruan
Aug 15 2018 08:36
my env: linux centos 7.4 64bits, 64G mem, 4*10 cpu, 1Gbits network
william.z
@zwillim
Aug 15 2018 08:39

I need a lot of time during communication like this:

        atomix.getCommunicationService().subscribe("test_type", (byte[] src) -> {
            System.out.println("coming in test_type, addr:" + "unknown" + ", cmd:" + new String(src));
            try {
                Thread.sleep(100000);
            } catch (InterruptedException ex) {
            }
            return ("return_" + i.getAndIncrement() + "_" + new String(src) + "_from_" + server).getBytes();
        }, new ThreadPoolExecutor(2, 32, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), (r, e) -> r.run()));

And met this timeout error:

java.util.concurrent.CompletionException: java.util.concurrent.TimeoutException: Request timed out in 20022 milliseconds
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
    at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
    at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
    at io.atomix.cluster.messaging.impl.NettyMessagingService.lambda$null$13(NettyMessagingService.java:500)
    at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:399)
    at io.atomix.cluster.messaging.impl.NettyMessagingService.lambda$null$14(NettyMessagingService.java:500)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
    at io.atomix.cluster.messaging.impl.NettyMessagingService$Callback.completeExceptionally(NettyMessagingService.java:827)
    at io.atomix.cluster.messaging.impl.NettyMessagingService$AbstractClientConnection.timeoutCallbacks(NettyMessagingService.java:910)
    at io.atomix.cluster.messaging.impl.NettyMessagingService.timeoutAllCallbacks(NettyMessagingService.java:338)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Request timed out in 20022 milliseconds
    ... 9 more

Is there any API to change the time?

Jordan Halterman
@kuujo
Aug 15 2018 08:57
@zwillim the ClusterCommunicationService methods are overloaded with an optional Duration which is the response timeout.
Well, the Duration overrides the default dynamically computed timeojt
Jordan Halterman
@kuujo
Aug 15 2018 09:11

@MiniDigger There are a few different ways to go about this depending on the performance and consistency requirements:

  1. The work queue sounds like the right abstraction from what you’ve described. In order to get the best performance, you can configure a Raft management group and primary-backup partition group on the manager nodes. Create the work queue using the primary-backup partitions and distribute tasks to the workers through that queue. With this configuration, Raft will be used to elect primaries, and tasks will be replicated in memory in the primary-backup partitions for redundancy. This is where the use of multiple protocols comes in handy.

If you happened to have persistence requirements, you could use Raft for the work queue instead, but there will be a big difference in throughput and latency.

If you needed certain tasks to be correlated, you can partition the set of tasks and use a LeaderElector to elect multiple leaders and assign related tasks to the same node. In that case, you use the ClusterCommunicationService to subscribe to events for some partition, and to distribute tasks to a worker look up the leader for the partition and send the task directly to it. This can be very efficient since it’s not going through a replication protocol. But it can be a lot of overhead if there’s a lot of lag between the producer and consumer since the only queue between the two is then Netty.

william.z
@zwillim
Aug 15 2018 09:11
I found it. can I set an unlimited duration?
Jordan Halterman
@kuujo
Aug 15 2018 09:12
Long.MAX_VALUE is fine. Just know that if some process doesn’t respond to a message you could have a memory leak
william.z
@zwillim
Aug 15 2018 09:12
ok, I'll try
Jordan Halterman
@kuujo
Aug 15 2018 09:13
Another way to go about it to just unicast the message with a unique ID and then unicast back an ack and manage the correlation outside the ClusterCommunicationService
Not much difference between the two except that your code has more domain knowledge than the communication service
william.z
@zwillim
Aug 15 2018 09:14
this looks like an async progress.
Jordan Halterman
@kuujo
Aug 15 2018 09:15
So it maybe can be made more efficient with that knowledge
The messaging service will hold on to some memory for each request-reply message and periodically check for timeouts even when the timeout is functionally infinite
So in that sense it would indeed be more efficient to use the above pattern
But whether it’s worth it depends on how many messages are outstanding at once
Junbo Ruan
@aruanruan
Aug 15 2018 09:18
@zwillim your scene looks like message queuing
such as activemq, rabbitmq, kafka
william.z
@zwillim
Aug 15 2018 09:20
i see. acturally i need a sync communication, during which some operations are handled
there won't be a lot of communication at a time, even not a lot totally
i will try use a large duration
MiniDigger
@MiniDigger
Aug 15 2018 09:32
ok, nice, thank you. hopefully I will find time to hack around with it today, looks really nice so far
Johno Crawford
@johnou
Aug 15 2018 11:21
@aruanruan any luck with the trace logs?
Jordan Halterman
@kuujo
Aug 15 2018 17:16
hopefully this is still enough information to reproduce it… it seems like it should be, but I’m guessing something about the environment makes those errors more likely
Although, I wonder if this could be something that was never cherry picked from the 2.0 branch
Jordan Halterman
@kuujo
Aug 15 2018 20:21
well, you’d think this could be reproduced in RaftPerformanceTest but… not seeing it
Jordan Halterman
@kuujo
Aug 15 2018 22:10
I put an aweful lot of load on a cluster with no luck reproducing it
Jordan Halterman
@kuujo
Aug 15 2018 22:58
That test burns my legs when I run it on my laptop :-P
Jordan Halterman
@kuujo
Aug 15 2018 23:18
I guess I can try to deduce the problem 🤷‍♂️
Junbo Ruan
@aruanruan
Aug 15 2018 23:43
hi
Jordan Halterman
@kuujo
Aug 15 2018 23:44
hey
Junbo Ruan
@aruanruan
Aug 15 2018 23:45
trace logs are so huge, & for some security reason of my company, i can not upload log files
Jordan Halterman
@kuujo
Aug 15 2018 23:45
I don’t need the logs
I just need a reproducer
Junbo Ruan
@aruanruan
Aug 15 2018 23:45
emm
Jordan Halterman
@kuujo
Aug 15 2018 23:46
my code above which does a million writes in a couple minutes doesn’t ever seem to reproduce it
Junbo Ruan
@aruanruan
Aug 15 2018 23:46
that means you do not reproduce the question yet
Jordan Halterman
@kuujo
Aug 15 2018 23:47
I’ve seen it once or twice but can’t fix it without reproducing it with trace logging enabled or a debugger attached