We are no longer monitoring this channel, please join Slack! https://join.slack.com/t/atomixio/shared_invite/enQtNDgzNjA5MjMyMDUxLTVmMThjZDcxZDE3ZmU4ZGYwZTc2MGJiYjVjMjFkOWMyNmVjYTc5YjExYTZiOWFjODlkYmE2MjNjYzZhNjU2MjY
AtomicDocumentTree
where we changed the paths from root|this|stupid|format
to /this/better/format
String
not in the namespace of ClusterCommunicationService
?
String
Class
String
java.util.concurrent.ExecutionException: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 112
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at cn.ac.iie.di.atomix3test.atomix.n.Client$T.send(Client.java:150)
at cn.ac.iie.di.atomix3test.atomix.n.Client$T.run(Client.java:122)
Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 112
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:804)
at io.atomix.utils.serializer.Namespace.lambda$null$2(Namespace.java:452)
at com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.run(KryoPoolQueueImpl.java:58)
at io.atomix.utils.serializer.Namespace.lambda$deserialize$3(Namespace.java:450)
at io.atomix.utils.serializer.KryoIOPool.run(KryoIOPool.java:45)
at io.atomix.utils.serializer.Namespace.deserialize(Namespace.java:448)
at io.atomix.utils.serializer.Serializer$1.decode(Serializer.java:57)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at io.atomix.cluster.messaging.impl.NettyMessagingService.lambda$null$11(NettyMessagingService.java:491)
at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:399)
at io.atomix.cluster.messaging.impl.NettyMessagingService.lambda$null$14(NettyMessagingService.java:491)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at io.atomix.cluster.messaging.impl.NettyMessagingService$Callback.complete(NettyMessagingService.java:823)
at io.atomix.cluster.messaging.impl.NettyMessagingService$RemoteClientConnection.dispatch(NettyMessagingService.java:1072)
at io.atomix.cluster.messaging.impl.NettyMessagingService$RemoteClientConnection.access$1000(NettyMessagingService.java:1023)
at io.atomix.cluster.messaging.impl.NettyMessagingService$InboundMessageDispatcher.channelRead0(NettyMessagingService.java:757)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsa
MemberId leader = election.getLeadership().leader().id();
CompletableFuture<byte[]> future = atomix.getCommunicationService().send("test_type", "this is a msg to leader".getBytes(), leader);
while (!future.isDone()) {
try {
sleep(500);
} catch (InterruptedException ex) {
}
}
try {
System.out.println(this.getName() + " - " + count + " - " + new String(future.get()));
} catch (InterruptedException | ExecutionException ex) {
System.err.println(this.getName() + " - " + count + " - error ");
ex.printStackTrace();
}
coming in test_type, addr:127.0.0.1:16176, cmd:this is a msg to leader
coming in test_type, addr:127.0.0.1:16177, cmd:this is a msg to leader
ClusterCommunicationService
change my msg? I can see more bytes received.
atomix.getCommunicationService().subscribe("test_type", (byte[] src) -> {
System.out.println("coming in test_type, addr:" + "unknown" + ", cmd:" + new String(src));
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
}
return ("return_" + i.getAndIncrement() + "_" + new String(src) + "_from_" + server).getBytes();
}, new ThreadPoolExecutor(2, 32, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(32), (r, e) -> r.run()));
Collection<Node> bootstrapLocations = Arrays.asList(
Node.builder().withId("foo").withAddress(Address.from("localhost:5000")).build(),
Node.builder().withId("bar").withAddress(Address.from("localhost:5001")).build(),
Node.builder().withId("baz").withAddress(Address.from("localhost:5002")).build());
AtomixCluster cluster1 = AtomixCluster.builder()
.withMemberId("foo")
.withAddress("localhost:5000")
.withMembershipProvider(BootstrapDiscoveryProvider.builder()
.withNodes(bootstrapLocations)
.build())
.build();
cluster1.start().join();
assertEquals("foo", cluster1.getMembershipService().getLocalMember().id().id());
AtomixCluster cluster2 = AtomixCluster.builder()
.withMemberId("bar")
.withAddress("localhost:5001")
.withMembershipProvider(BootstrapDiscoveryProvider.builder()
.withNodes(bootstrapLocations)
.build())
.build();
cluster2.start().join();
cluster1.getCommunicationService().subscribe("test", message -> {
return CompletableFuture.completedFuture("world!");
});
String result = cluster2.getCommunicationService().<String, String>send("test", "Hello!", cluster1.getMembershipService().getLocalMember().id()).join();
System.out.println(result);
Collection<Node> bootstrapLocations = Arrays.asList(
Node.builder().withId("foo").withAddress(Address.from("localhost:5000")).build(),
Node.builder().withId("bar").withAddress(Address.from("localhost:5001")).build(),
Node.builder().withId("baz").withAddress(Address.from("localhost:5002")).build());
AtomixCluster cluster1 = AtomixCluster.builder()
.withMemberId("foo")
.withAddress("localhost:5000")
.withMembershipProvider(BootstrapDiscoveryProvider.builder()
.withNodes(bootstrapLocations)
.build())
.build();
cluster1.start().join();
assertEquals("foo", cluster1.getMembershipService().getLocalMember().id().id());
AtomixCluster cluster2 = AtomixCluster.builder()
.withMemberId("bar")
.withAddress("localhost:5001")
.withMembershipProvider(BootstrapDiscoveryProvider.builder()
.withNodes(bootstrapLocations)
.build())
.build();
cluster2.start().join();
cluster1.getCommunicationService().<byte[], byte[]>subscribe("test", message -> {
System.out.println(new String(message));
return CompletableFuture.completedFuture("world!".getBytes());
});
byte[] result = cluster2.getCommunicationService().<byte[], byte[]>send("test", "Hello!".getBytes(), cluster1.getMembershipService().getLocalMember().id()).join();
System.out.println(new String(result));
future.get()
future.get()
is exactly the at cn.ac.iie.di.atomix3test.atomix.n.Client$T.send(Client.java:150)
nvm
mean...
AtomicInteger count = new AtomicInteger();
cluster1.getCommunicationService().subscribe("test_type", (byte[] src) -> {
System.out.println("coming in test_type, addr:" + "unknown" + ", cmd:" + new String(src));
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
}
return ("return_" + count.getAndIncrement() + "_" + new String(src) + "_from_" + 1).getBytes();
}, new ThreadPoolExecutor(2, 32, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(32), (r, e) -> r.run()));
CompletableFuture<byte[]> future = cluster2.getCommunicationService().send("test_type", "this is a msg to leader".getBytes(), cluster1.getMembershipService().getLocalMember().id());
String response = new String(future.join());
System.out.println(response);
byte[]
byte[]
to a String
do you still get the same exception with the same ID?
byte[]
is registered with ID 16
String
ok
String
?
Collection<Node> bootstrapLocations = Arrays.asList(
Node.builder().withId("foo").withAddress(Address.from("localhost:5000")).build(),
Node.builder().withId("bar").withAddress(Address.from("localhost:5001")).build(),
Node.builder().withId("baz").withAddress(Address.from("localhost:5002")).build());
AtomixCluster cluster1 = AtomixCluster.builder()
.withMemberId("foo")
.withAddress("localhost:5000")
.withMembershipProvider(BootstrapDiscoveryProvider.builder()
.withNodes(bootstrapLocations)
.build())
.build();
cluster1.start().join();
assertEquals("foo", cluster1.getMembershipService().getLocalMember().id().id());
AtomixCluster cluster2 = AtomixCluster.builder()
.withMemberId("bar")
.withAddress("localhost:5001")
.withMembershipProvider(BootstrapDiscoveryProvider.builder()
.withNodes(bootstrapLocations)
.build())
.build();
cluster2.start().join();
cluster1.getCommunicationService().<byte[], byte[]>subscribe("test", message -> {
System.out.println(new String(message));
return CompletableFuture.completedFuture("world!".getBytes());
});
byte[] result = cluster2.getCommunicationService().<byte[], byte[]>send("test", "Hello!".getBytes(), cluster1.getMembershipService().getLocalMember().id()).join();
System.out.println(new String(result));
.data
dir
StorageLevel
to MEMORY
.withManagementGroup(RaftPartitionGroup.builder("systen")
.withNumPartitions(1).withMembers("server1", "server2", "server3")
.withStorageLevel(StorageLevel.MEMORY).build())
PS D:\Storage\work\huayan\data_exchange\project\Atomix3Test> ls
目录: D:\Storage\work\huayan\data_exchange\project\Atomix3Test
Mode LastWriteTime Length Name
---- ------------- ------ ----
d----- 2018/8/1 10:10 .data
d----- 2018/4/2 16:42 .projectKnowledge
d----- 2018/4/24 13:49 src
d----- 2018/7/23 10:02 target
-a---- 2018/7/30 16:01 8 .gitignore
-a---- 2018/7/30 16:01 1408 pom.xml
.data.
dir still exists
test_type
or something?
PS D:\Storage\work\huayan\data_exchange\project\Atomix3Test\.data\system\partitions\1> ls
目录: D:\Storage\work\huayan\data_exchange\project\Atomix3Test\.data\system\partitions\1
Mode LastWriteTime Length Name
---- ------------- ------ ----
-a---- 2018/8/1 10:19 128 system-partition-1.meta
PS D:\Storage\work\huayan\data_exchange\project\Atomix3Test\.data\system\partitions\1>
data-partition-1.meta
/
is the root path
/
is represented
/
. This allows a relative path or child paths to be correctly represented
””
to make it absolute
from(String...)
method should be fixed