io.rsocket.exceptions.RejectedSetupException: Access Denied
when firing request with RSocket Client CLI. This error seems to be on server-side. Have anyone done this tutorial?
io.rsocket.exceptions.ApplicationErrorException: No handler for destination ‘’
Hi. I have a question - how do I tell RSocket to establish the connection (call the setup route endpoint) without invoking any of the other endpoints?
RSocketRequester requester = builder
.setupRoute("/connect")
.setupData(agentId)
.rsocketConnector(connector -> connector.reconnect(Retry.fixedDelay(Integer.MAX_VALUE, Duration.ofSeconds(1))))
.dataMimeType(MediaType.APPLICATION_JSON)
.transport(WebsocketClientTransport.create(httpClient, "/rsocket"));
At this point connection is not actually established. How do I force it to connect to the backend?
Thanks.
Hi. Question about shutting down Spring context with Flux.interval()
running.
I have the following code that pushes telemetry over RSocket at a given interval.
@Component
public class BackendService {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final String id = "test1";
private final String backendUrl = "ws://localhost:9898";
private final RSocketRequester requester;
private final Disposable telemetrySubscriber;
@PreDestroy
public void destroy() {
telemetrySubscriber.dispose();
}
public BackendService(RSocketRequester.Builder builder, RSocketStrategies strategies) {
logger.info("id: {}, backend URL: {}", id, backendUrl);
HttpClient httpClient = HttpClient.create()
.baseUrl(backendUrl);
requester = builder
.setupRoute("/backend/connect")
.setupData(id)
.rsocketConnector(connector -> connector.reconnect(Retry.fixedDelay(Integer.MAX_VALUE, Duration.ofSeconds(1))))
.dataMimeType(MediaType.APPLICATION_JSON)
.transport(WebsocketClientTransport.create(httpClient, "/rsocket"));
telemetrySubscriber = Flux.interval(Duration.ofSeconds(10))
.onBackpressureDrop()
.map(tick -> "Telemetry from " + id + " tick " + tick)
.as(flux -> requester.route("/backend/telemetry").data(flux).retrieveFlux(Void.class))
.subscribe();
}
}
It works fine in general, but I have noticed that when I shut down Spring Context where this Component is defined, it doesn't shut down completely - there is a bunch of reactor threads that are still running (e.g. reactor-http-nio-X etc) and it seems that Flux.interval() is still active. The following exception is being printed at the given interval:
java.util.concurrent.RejectedExecutionException: event executor terminated
at io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:926) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:353) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:346) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:828) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
The complete code is here: https://github.com/maximdim/rsocket2/blob/lifecycle/src/main/java/rsocket/client/BackendService.java
I would appreciate any hints. Thanks!
@OlegDokuka I have a conceptual question about Rsocket and security. I'm aware that there is a spring security for rsocket but let's assume that for my use case it's an overkill. I want to pass credentials as part of the setup route and do authentication in the @ConnectMapping
handler:
@ConnectMapping
Mono<Void> handleSetup(@Payload String payload) {
boolean isAuthenticated = ... //my authentication code.
if (!isAuthenticated) {
return Mono.error(new RejectedSetupException("connection is not authenticated"));
}
...
Now what should I do to access to the caller's identity in the regular @MessageMapping
handler? I don't want to rely on anything that client passes into that handler as authentication is already happened in the @ConnectMapping
but I still need to know the caller's identity somehow. It's part of the same connection after all.
Thanks!
Hopefully this is the right place to ask for help with this. Im having troubles connecting my Server (hosted on Heroku) from my local Shell Client.
In my server logs everything starts up normally as expected as it says
2021-02-26T21:16:22.837212+00:00 app[web.1]: 2021-02-26 21:16:22.836 INFO 4 --- [ main] o.s.b.rsocket.netty.NettyRSocketServer : Netty RSocket started on port(s): 7000
When i try to connect from my shell client i get the following error
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection timed out: no further information: nuchess-lobby.herokuapp.com/52.72.160.125:7000
Heres a pastebin with the full log: https://pastebin.com/gi05zu9r
Heroku claims to support Websocket (not TCP) on their free tier so i configured the server to use WS instead of TCP and both clinets connect to the WS port properly locally. So im not sure what im missing here.
When the connection fails in my shell client theres no error or anything of that kind in the server logs either.
Is there symmetrical support for keepAlive? the spec says
Unsigned 31-bit integer of Time (in milliseconds) that a client will allow a server to not respond to a KEEPALIVE before it is assumed to be dead
but I wonder if also the server can use this wether to tell if a certain client is dead? Any Idea how to solve this (without reimplementing keep alive myself using payloads)
There is a deprecated ServerKeepAliveSupport
but that's all i found on this topic...
currently if the client dies the server doesn't notice.
org.springframework.messaging.rsocket.RSocketRequester
) to rscocket transport.AbstractJackson2Encoder
) have either the data payload or the metadata, but not both.Hi guys! I've been working on this project https://github.com/ivangfr/springboot-rsocket-webflux-aop where I use rsocket.
The project is composed with 2 clients (movie-client-shell
and movie-client-ui
) and one server (movie-server
).
One problem that I see in my solution is that, once the server restarts, the client movie-client-ui
doesn't connect to it anymore. For sure, I need to configure some retry mechanism.
How could I implement it in this class https://github.com/ivangfr/springboot-rsocket-webflux-aop/blob/master/movie-client-ui/src/main/java/com/mycompany/movieclientui/config/MovieServerRSocketConfig.java#L54?
Thanks in advance!
Hi @OlegDokuka , 1 quick question. Currently is there any in-built support to find idle connections and dispose?
by idle connection i mean the keep alive passes, but the requester has not sent any requests for a while like 5 mins. something like that.
Sorry, accidentially clicked report butten and the message dissapeared 🤦♂️🙇♂️
Hey, I am quite new to RSocket and I am trying to work on a POC. The use case I am working on needs to have a one to many connections between client and server. I have a single client which will be connecting with multiple servers with a request stream connection. Every server will send some information to the client. Client will need to aggregate all the information and log it. If an example implementation can be provided it will be of great help. Thanks!!!! :)
@OlegDokuka Hey Oleg, is this something you can help with or advise. I basically needs a stream between a client and multiple servers. After the client send a request all the servers will start a stream of information to the client. Any help or pointer in the right direction is appreciated. :)
12:25:02.285 ERROR [correlationId=test123][reactor-http-nio-4] HttpExceptionHandler - Unhandled exception: CanceledException: THE PAYLOAD IS TOO BIG TO BE SEND AS A SINGLE FRAME WITH A MAX FRAME LENGTH 16777215. CONSIDER ENABLING FRAGMENTATION.
io.rsocket.exceptions.CanceledException: The payload is too big to be send as a single frame with a max frame length 16777215. Consider enabling fragmentation.
at io.rsocket.exceptions.Exceptions.from(Exceptions.java:80)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ Handler og.some.spring.controller.SomeController#execute(params) [DispatcherHandler]
Stack trace:
at io.rsocket.exceptions.Exceptions.from(Exceptions.java:80)
at io.rsocket.core.RSocketRequester.handleFrame(RSocketRequester.java:255)
at io.rsocket.core.RSocketRequester.handleIncomingFrames(RSocketRequester.java:205)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at io.rsocket.core.ClientServerInputMultiplexer$InternalDuplexConnection.onNext(ClientServerInputMultiplexer.java:248)
at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:132)
requester.route(route).metadata(metadata, APPLICATION_JSON) .data(request).retrieveMono(VoidRef)
to requester.route(route).metadata(metadata, APPLICATION_JSON).data(request).send()
. Any ideas why this could be happening? Thanks in advance!