Hi, we have a use case when the server needs to reject a connecting RSocket client if it has the same ID as an already existing (connected) client.
So far we’ve used @ConnectMapping to register clients and this method also returns Mono.error(RejectedSetupException())
in case of the duplicate.
Now, on the client side, the rejected client receives two exceptions:
ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disposed
Caused by: java.util.concurrent.CancellationException: Disposed
at io.rsocket.internal.UnboundedProcessor.dispose(UnboundedProcessor.java:545)
at io.rsocket.transport.netty.WebsocketDuplexConnection.doOnClose(WebsocketDuplexConnection.java:72)
at io.rsocket.internal.BaseDuplexConnection.lambda$new$0(BaseDuplexConnection.java:30)
...
at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46)
at io.rsocket.internal.BaseDuplexConnection.dispose(BaseDuplexConnection.java:51)
and
reactor.core.Exceptions$ErrorCallbackNotImplemented: RejectedSetupException (0x3): MPC Gateway with ID 'mpc1' already connected
Caused by: io.rsocket.exceptions.RejectedSetupException: MPC Gateway with ID 'mpc1' already connected
at io.rsocket.exceptions.Exceptions.from(Exceptions.java:62)
at io.rsocket.core.RSocketRequester.lambda$tryTerminateOnZeroError$4(RSocketRequester.java:313)
at io.rsocket.core.RSocketRequester.tryTerminate(RSocketRequester.java:318)
at io.rsocket.core.RSocketRequester.tryTerminateOnZeroError(RSocketRequester.java:313)
at io.rsocket.core.RSocketRequester.handleStreamZero(RSocketRequester.java:224)
at io.rsocket.core.RSocketRequester.handleIncomingFrames(RSocketRequester.java:209)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at io.rsocket.core.ClientServerInputMultiplexer$InternalDuplexConnection.onNext(ClientServerInputMultiplexer.java:248)
Both are ErrorCallbackNotImplemented
.
Question: which is the correct place to hook into in order to handle these errors? Maybe we’re missing something basic here.
Another question is, should we look into interceptors instead of @ConnectMapping for the use case above?
please suggest a change in the code. the code is working as i want it to be. their are two
rsocket client developed in spring java rsocket and one spring rsocket server. The flow is the server is running and storing connected clients in map using
private final List<RSocketRequester> CLIENTS = new ArrayList<>();
private final Map<String, RSocketRequester> clientsMap = new HashMap<>();
@ConnectMapping("shell-client")
void connectShellClientAndAskForTelemetry(RSocketRequester requester,
@Payload String client) {
requester.rsocket()
.onClose()
.doFirst(() -> {
// Add all new clients to a client list
log.info("Client: {} CONNECTED.", client);
CLIENTS.add(requester);
clientsMap.put(client, requester);
})
.doOnError(error -> {
// Warn when channels are closed by clients
log.warn("Channel to client {} CLOSED", client);
})
.doFinally(consumer -> {
// Remove disconnected clients from the client list
CLIENTS.remove(requester);
clientsMap.remove(client);
log.info("Client {} DISCONNECTED", client);
})
.subscribe();
}
both the client send message to each other using server. server code is
@MessageMapping("channel")
Flux<Message> channel(@Payload Flux<Message> messageFlux) {
log.info("Received channel request...");
return messageFlux
.doOnNext(message1 -> System.out.println("Sender : " + message1.getSender() +
" Receiver : " + message1.getReceiver() +
" Content : " + message1.getContent()))
.switchMap(message1 ->
clientsMap.get(message1.getReceiver())
.route("channel")
.data(Flux.just(message1))
.retrieveFlux(Message.class)
)
.log();
}
and this is the message Class
public class Message {
private String sender;
private String receiver;
private String content;
public Message(String sender, String receiver, String content){
this.sender = sender;
this.receiver = receiver;
this.content = content;
}
}
i want suggestion on my server code. the code is simulating single voice call.
Hi guys, can someone help me to understand how reconnect feature works? I have one scenario which does not work as I would expect.
I'm using Spring's RSocketRequester as a client. Consider the following code:
val requester = RSocketRequester.builder()
.rsocketStrategies {
it.decoder(Jackson2JsonDecoder())
it.encoder(Jackson2JsonEncoder())
}
.rsocketConnector {
it.reconnect(Retry.max(3).doBeforeRetry {
runBlocking {
delay(Duration.seconds(1))
log.warn { "Unable to connect to Elliot Manager. Retrying... (${it.totalRetries()})" }
}
})
}
.dataMimeType(MediaType.APPLICATION_JSON)
.setupData(ElliotSetupPayload("client-1"))
.websocket(URI.create("ws://localhost:5000"))
// first call
log.info { "first call" }
try {
val collection = requester
.route("v1.resourceCollections.byName({name})", "polygon")
.retrieveMono(CommonResourceCollectionResponse::class.java)
.block()
log.info { collection.toString() }
} catch (e: Exception) {
}
// second call
log.info { "second call" }
val collection2 = requester
.route("v1.resourceCollections.byName({name})", "polygon")
.retrieveMono(CommonResourceCollectionResponse::class.java)
.block()
log.info { collection2.toString() }
When I run this AND server is down, the result is following:
12:41:19.247 [main] INFO Client - first call
12:41:20.586 [reactor-tcp-epoll-2] WARN Client - Unable to connect to Elliot Manager. Retrying... (0)
12:41:21.600 [reactor-tcp-epoll-2] WARN Client - Unable to connect to Elliot Manager. Retrying... (1)
12:41:22.605 [reactor-tcp-epoll-2] WARN Client - Unable to connect to Elliot Manager. Retrying... (2)
12:41:22.608 [main] INFO Client - second call
Exception in thread "main" reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3
at reactor.core.Exceptions.retryExhausted(Exceptions.java:290)
at reactor.util.retry.RetrySpec.lambda$static$2(RetrySpec.java:61)
...
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
at ClientKt.main(Client.kt:149)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
at ClientKt.main(Client.kt:160)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: finishConnect(..) failed: Connection refused: localhost/127.0.0.1:5000
Caused by: java.net.ConnectException: finishConnect(..) failed: Connection refused
at io.netty.channel.unix.Errors.newConnectException0(Errors.java:155)
at io.netty.channel.unix.Errors.handleConnectErrno(Errors.java:128)
at io.netty.channel.unix.Socket.finishConnect(Socket.java:320)
at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.doFinishConnect(AbstractEpollChannel.java:710)
...
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
Why the client does not try to reconnect 3 times during the second call? And how would you do this in order to behave like this?
Use the same connection when the connection is open (so both calls use the same session) and try to reconnect (not indefinitely) on every call when the connection is not opened or could not be established.
What about infinite reconnect with a combination of kotlin coroutines? Let me demonstrate...
private fun <T> timeoutCall(timeout: Duration = Duration.seconds(3), call: suspend () -> T?): T? = runBlocking {
try {
withTimeout(timeout) {
call()
}
} catch (e: TimeoutCancellationException) {
log.warn { "RSocket call timed out." }
null
}
}
val requester = RSocketRequester.builder()
.rsocketStrategies {
it.decoder(Jackson2JsonDecoder())
it.encoder(Jackson2JsonEncoder())
}
.rsocketConnector {
it.reconnect(Retry.indefinitely().doBeforeRetry {
runBlocking {
delay(Duration.seconds(1))
log.debug { "Unable to connect to Elliot Manager. Retrying... (${it.totalRetries()})" }
}
})
}
.dataMimeType(MediaType.APPLICATION_JSON)
.setupData(ElliotSetupPayload("client-1"))
.websocket(URI.create("ws://localhost:5000"))
log.info { "first call" }
timeoutCall {
requester
.route("v1.resourceCollections.byName({name})", "polygon")
.retrieveAndAwaitOrNull<CommonResourceCollectionResponse>()
}?.let {
log.info { it.toString() }
}
log.info { "little pause" }
runBlocking { delay(10000) }
log.info { "continuing" }
// second call
log.info { "second call" }
timeoutCall {
requester
.route("v1.resourceCollections.byName({name})", "polygon")
.retrieveAndAwaitOrNull<CommonResourceCollectionResponse>()
}?.let {
log.info { it.toString() }
}
log.info { "Finished" }
This will produce:
--> Server not running
13:55:35.964 [main] INFO Client - first call
13:55:39.017 [main] WARN Client - RSocket call timed out.
13:55:39.018 [main] INFO Client - little pause
--> Server started
13:55:49.021 [main] INFO Client - continuing
13:55:49.022 [main] INFO Client - second call
--> Reconnected
13:55:49.260 [main] INFO Client - CommonResourceCollectionResponse(id=61449f4b09394f7580cef144, type=TABLE, name=polygon, lockable=true)
13:55:49.261 [main] INFO Client - Finished
So it is working as expected. Is it a way to go or am I missing something?
@Override
public Flux<Payload> requestStream(Payload payload) {
//first flux
return Flux.interval(Duration.ofMillis(1000))
.map(time -> {
System.out.println("Creating...");
return DefaultPayload.create("Hello " + payload.getDataUtf8() + " @ " + Instant.now());
});
//second flux
return Flux.create(fluxSink -> {
for (int i = 0; i < 10; i++) {
try {
System.out.println("Creating...");
Thread.sleep(5000);
fluxSink.next(DefaultPayload.create(String.format("Number %s", i)));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
when I write this
this.requester.dispose();
i get this exception
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disposed
Caused by: java.util.concurrent.CancellationException: Disposed
is there a way I can dispose connection on rsocket server.
@BeforeAll
public static void setupOnce(@Autowired RSocketRequester.Builder builder, @Value("${spring.rsocket.server.port}") Integer port) {
requester = builder.transport(WebsocketClientTransport.create(port));
}
rsocket-micrometer
on the CLIENT side, by configuring the RSocketConnector
with interceptors.@MessageMapping
) Spring Boot RSocket Controller.Need an advise on using reactive resources (e.g. reactive http client, reactive Redis client, etc) in doFirst() and doFinally() behaviours (side-effects). E.g.
rsocketRequester.rsocket().onClose()
.doOnFirst(() -> {
httpClient.get().uri(serviceUrl)
...
.subscribe(); // <--- ***
})
...
.doFinally(() -> {
httpClient.get().uri(serviceUrl)
...
.subscribe(); // <--- ***
})
.subscribe();
Basically because of ***
these http calls is happening asynchronously and outside of rsocket lifecycle. Could it be rewritten without these spawned subscribes and also be bound to lifecycle? E.g. don't finis/dispose the rsocket subscribe() until http call is completed.
Thanks
@Configuration
@EnableRSocketSecurity
@EnableReactiveMethodSecurity
public class RSocketSecurityConfig {
@Bean
PayloadSocketAcceptorInterceptor authorization(RSocketSecurity security, FirebaseAuth firebaseAuth) {
security.authorizePayload(authorize ->
authorize
.anyExchange().authenticated()
).jwt(jwtSpec -> {
jwtSpec.authenticationManager(new FirebaseAuthenticationManager(firebaseAuth));
});
return security.build();
}
@Bean
RSocketMessageHandler messageHandler(RSocketStrategies strategies) {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.getArgumentResolverConfigurer().addCustomResolver(new AuthenticationPrincipalArgumentResolver());
handler.setRSocketStrategies(strategies);
return handler;
}
}
@PreAuthorize("hasRole('USER')")
@MessageMapping("auth-test")
public Mono<Test> testMono(Test msg, @AuthenticationPrincipal UserDetails userDetails) {
log.info("Got message {}", msg.getMessage());
return Mono.just("Well hello")
.map(s -> Test.newBuilder()
.setMessage(s)
.build()
);
}
@Service
public class FirebaseUserDetailsService implements AuthenticationUserDetailsService<FirebaseAuthenticationToken> {
@Override
public UserDetails loadUserDetails(FirebaseAuthenticationToken token) throws UsernameNotFoundException {
log.info("loadUserDetails gets called, authorities: {}", token.getAuthorities());
return User.builder()
.username(token.getName())
.authorities(token.getAuthorities())
.build();
}
}
Hello everyone, is there any code example available for dart client ( flutter ) and spring websocket server ? i'm getting error at connection setup stage while using @ Payload . Could anyone know what the issue ?
Dart client code dart package rsocket-dart
RSocket rsocket = await RSocketConnector.create()
.metadataMimeType('message/x.rsocket.routing.v0')
.dataMimeType('text/plain')
.setupPayload(Payload.fromText('text/plain', 'test data'))
.connect('ws://10.0.2.2:6565/rsocket');
RSocket JAVA code
@ConnectMapping
public void onConnect(RSocketRequester requester, @Payload String payload) { ... /// }
property file
spring.rsocket.server.port=6565
spring.rsocket.server.mapping-path=/rsocket
spring.rsocket.server.transport=websocket
Hello! I got quite stuck figuring out how to control the requestN value from my Spring Boot application.
The context: I am using an rsocket channel. I would like to limit the number of messages per connection the backend receives at a time (those can be quite bulky).
What I do:
rateLimit
in my @MessageMapping
handlerNevertheless, the very first requestN I see in the logs looks like this:
`
2022-03-16 17:23:44.498 DEBUG 2265 --- [ctor-http-nio-7] io.rsocket.FrameLogger : sending ->
Frame => Stream ID: 3 Type: REQUEST_N Flags: 0b0 Length: 10 RequestN: 31
I have no idea where does 31
come from!
Any hints much appreciated!
spring-boot-starter-rsocket
@MessageMapping("...")
rsocket channels. Is this a thing that exists? Or do I need to record and publish metrics myself? Or maybe this is the wrong place to ask?
clientRSocket = RSocketConnector.create()
.metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.string)
.dataMimeType(WellKnownMimeType.APPLICATION_JSON.string)
.setupPayload(DefaultPayload.create("ttest"))
.reconnect(Retry.fixedDelay(36, Duration.ofSeconds(10)))
.connect(transport)
.doOnSuccess {
Log.e("TAG", "Success")
}
.doOnCancel {
Log.e("TAG", "Cancel")
}
.doOnError {
Log.e("TAG", "Error")
}.doFinally {
Log.e("TAG", "Finally")
}.block()