Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    alwaystheobvious
    @alwaystheobvious
    if it's possible ... ofc
    Bazzinga
    @Vadim_S_1_twitter
    Hi all. I can not find information on how to configure tls in spring-boot-starter-rsocket. I find some examples but it not spring boot https://github.com/reactor/reactor-netty/blob/main/reactor-netty-examples/src/main/java/reactor/netty/examples/http/helloworld/HelloWorldServer.java
    5 replies
    Pavel Grigorenko
    @MastaP

    Hi, we have a use case when the server needs to reject a connecting RSocket client if it has the same ID as an already existing (connected) client.
    So far we’ve used @ConnectMapping to register clients and this method also returns Mono.error(RejectedSetupException()) in case of the duplicate.
    Now, on the client side, the rejected client receives two exceptions:

    ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
    reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disposed
    Caused by: java.util.concurrent.CancellationException: Disposed
        at io.rsocket.internal.UnboundedProcessor.dispose(UnboundedProcessor.java:545)
        at io.rsocket.transport.netty.WebsocketDuplexConnection.doOnClose(WebsocketDuplexConnection.java:72)
        at io.rsocket.internal.BaseDuplexConnection.lambda$new$0(BaseDuplexConnection.java:30)
    ...
        at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46)
        at io.rsocket.internal.BaseDuplexConnection.dispose(BaseDuplexConnection.java:51)

    and

    reactor.core.Exceptions$ErrorCallbackNotImplemented: RejectedSetupException (0x3): MPC Gateway with ID 'mpc1' already connected
    Caused by: io.rsocket.exceptions.RejectedSetupException: MPC Gateway with ID 'mpc1' already connected
        at io.rsocket.exceptions.Exceptions.from(Exceptions.java:62)
        at io.rsocket.core.RSocketRequester.lambda$tryTerminateOnZeroError$4(RSocketRequester.java:313)
        at io.rsocket.core.RSocketRequester.tryTerminate(RSocketRequester.java:318)
        at io.rsocket.core.RSocketRequester.tryTerminateOnZeroError(RSocketRequester.java:313)
        at io.rsocket.core.RSocketRequester.handleStreamZero(RSocketRequester.java:224)
        at io.rsocket.core.RSocketRequester.handleIncomingFrames(RSocketRequester.java:209)
        at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
        at io.rsocket.core.ClientServerInputMultiplexer$InternalDuplexConnection.onNext(ClientServerInputMultiplexer.java:248)

    Both are ErrorCallbackNotImplemented.
    Question: which is the correct place to hook into in order to handle these errors? Maybe we’re missing something basic here.
    Another question is, should we look into interceptors instead of @ConnectMapping for the use case above?

    11 replies
    khizar khan
    @KhanKhankhel25_twitter
    @OlegDokuka is there way i can connect two clients. like one client sends request to the server to connect to this client the server is keeping track of all the connect clients. the server call the client and connect client1 with client2. the data from client1 to client2 passing through server bi-directionaly. any pointers can help. Thanks.
    3 replies
    khizar khan
    @KhanKhankhel25_twitter
    hey can i create a bi-directional channel between two clients through server.
    khizar khan
    @KhanKhankhel25_twitter
    what is this error
    reactor.core.Exceptions$ErrorCallbackNotImplemented: ApplicationErrorException (0x201): Destination 'channel' does not support REQUEST_STREAM. Supported interaction(s): [REQUEST_CHANNEL]
    4 replies
    khizar khan
    @KhanKhankhel25_twitter

    please suggest a change in the code. the code is working as i want it to be. their are two
    rsocket client developed in spring java rsocket and one spring rsocket server. The flow is the server is running and storing connected clients in map using

    private final List<RSocketRequester> CLIENTS = new ArrayList<>();
    private final Map<String, RSocketRequester> clientsMap = new HashMap<>();
    
    @ConnectMapping("shell-client")
        void connectShellClientAndAskForTelemetry(RSocketRequester requester,
                                                  @Payload String client) {
    
            requester.rsocket()
                    .onClose()
                    .doFirst(() -> {
                        // Add all new clients to a client list
                        log.info("Client: {} CONNECTED.", client);
                        CLIENTS.add(requester);
                        clientsMap.put(client, requester);
                    })
                    .doOnError(error -> {
                        // Warn when channels are closed by clients
                        log.warn("Channel to client {} CLOSED", client);
                    })
                    .doFinally(consumer -> {
                        // Remove disconnected clients from the client list
                        CLIENTS.remove(requester);
                        clientsMap.remove(client);
                        log.info("Client {} DISCONNECTED", client);
                    })
                    .subscribe();
    }

    both the client send message to each other using server. server code is

    @MessageMapping("channel")
        Flux<Message> channel(@Payload Flux<Message> messageFlux) {
            log.info("Received channel request...");
    
    
            return messageFlux
                        .doOnNext(message1 -> System.out.println("Sender : " + message1.getSender() +
                                " Receiver : " + message1.getReceiver() +
                                " Content : " + message1.getContent()))
                        .switchMap(message1 ->
                            clientsMap.get(message1.getReceiver())
                                    .route("channel")
                                    .data(Flux.just(message1))
                                    .retrieveFlux(Message.class)
                        )
                        .log();  
        }

    and this is the message Class

    public class Message {
    
        private String sender;
        private String receiver;
        private String content;
    
        public Message(String sender, String receiver, String content){
            this.sender = sender;
            this.receiver = receiver;
            this.content = content;
        }
    }

    i want suggestion on my server code. the code is simulating single voice call.

    Jan Tobola
    @jantobola

    Hi guys, can someone help me to understand how reconnect feature works? I have one scenario which does not work as I would expect.
    I'm using Spring's RSocketRequester as a client. Consider the following code:

    val requester = RSocketRequester.builder()
        .rsocketStrategies {
            it.decoder(Jackson2JsonDecoder())
            it.encoder(Jackson2JsonEncoder())
        }
        .rsocketConnector {
            it.reconnect(Retry.max(3).doBeforeRetry {
                runBlocking {
                    delay(Duration.seconds(1))
                    log.warn { "Unable to connect to Elliot Manager. Retrying... (${it.totalRetries()})" }
                }
            })
        }
        .dataMimeType(MediaType.APPLICATION_JSON)
        .setupData(ElliotSetupPayload("client-1"))
        .websocket(URI.create("ws://localhost:5000"))
    
    // first call
    log.info { "first call" }
    try {
        val collection = requester
            .route("v1.resourceCollections.byName({name})", "polygon")
            .retrieveMono(CommonResourceCollectionResponse::class.java)
            .block()
        log.info { collection.toString() }
    } catch (e: Exception) {
    
    }
    
    // second call
    log.info { "second call" }
    val collection2 = requester
        .route("v1.resourceCollections.byName({name})", "polygon")
        .retrieveMono(CommonResourceCollectionResponse::class.java)
        .block()
    log.info { collection2.toString() }

    When I run this AND server is down, the result is following:

    12:41:19.247 [main] INFO Client - first call
    12:41:20.586 [reactor-tcp-epoll-2] WARN Client - Unable to connect to Elliot Manager. Retrying... (0)
    12:41:21.600 [reactor-tcp-epoll-2] WARN Client - Unable to connect to Elliot Manager. Retrying... (1)
    12:41:22.605 [reactor-tcp-epoll-2] WARN Client - Unable to connect to Elliot Manager. Retrying... (2)
    12:41:22.608 [main] INFO Client - second call
    Exception in thread "main" reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3
        at reactor.core.Exceptions.retryExhausted(Exceptions.java:290)
        at reactor.util.retry.RetrySpec.lambda$static$2(RetrySpec.java:61)
        ...
        Suppressed: java.lang.Exception: #block terminated with an error
            at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
            at ClientKt.main(Client.kt:149)
        Suppressed: java.lang.Exception: #block terminated with an error
            at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
            at ClientKt.main(Client.kt:160)
    Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: finishConnect(..) failed: Connection refused: localhost/127.0.0.1:5000
    Caused by: java.net.ConnectException: finishConnect(..) failed: Connection refused
        at io.netty.channel.unix.Errors.newConnectException0(Errors.java:155)
        at io.netty.channel.unix.Errors.handleConnectErrno(Errors.java:128)
        at io.netty.channel.unix.Socket.finishConnect(Socket.java:320)
        at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.doFinishConnect(AbstractEpollChannel.java:710)
            ...
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:834)

    Why the client does not try to reconnect 3 times during the second call? And how would you do this in order to behave like this?
    Use the same connection when the connection is open (so both calls use the same session) and try to reconnect (not indefinitely) on every call when the connection is not opened or could not be established.

    2 replies
    Jan Tobola
    @jantobola

    What about infinite reconnect with a combination of kotlin coroutines? Let me demonstrate...

    private fun <T> timeoutCall(timeout: Duration = Duration.seconds(3), call: suspend () -> T?): T? = runBlocking {
        try {
            withTimeout(timeout) {
                call()
            }
        } catch (e: TimeoutCancellationException) {
            log.warn { "RSocket call timed out." }
            null
        }
    }
    
    val requester = RSocketRequester.builder()
        .rsocketStrategies {
            it.decoder(Jackson2JsonDecoder())
            it.encoder(Jackson2JsonEncoder())
        }
        .rsocketConnector {
            it.reconnect(Retry.indefinitely().doBeforeRetry {
                runBlocking {
                    delay(Duration.seconds(1))
                    log.debug { "Unable to connect to Elliot Manager. Retrying... (${it.totalRetries()})" }
                }
            })
        }
        .dataMimeType(MediaType.APPLICATION_JSON)
        .setupData(ElliotSetupPayload("client-1"))
        .websocket(URI.create("ws://localhost:5000"))
    
    log.info { "first call" }
    timeoutCall {
        requester
            .route("v1.resourceCollections.byName({name})", "polygon")
            .retrieveAndAwaitOrNull<CommonResourceCollectionResponse>()
    }?.let {
        log.info { it.toString() }
    }
    
    log.info { "little pause" }
    runBlocking { delay(10000) }
    log.info { "continuing" }
    
    // second call
    log.info { "second call" }
    timeoutCall {
        requester
            .route("v1.resourceCollections.byName({name})", "polygon")
            .retrieveAndAwaitOrNull<CommonResourceCollectionResponse>()
    }?.let {
        log.info { it.toString() }
    }
    
    log.info { "Finished" }

    This will produce:

     --> Server not running
    13:55:35.964 [main] INFO Client - first call
    13:55:39.017 [main] WARN Client - RSocket call timed out.
    13:55:39.018 [main] INFO Client - little pause
     --> Server started
    13:55:49.021 [main] INFO Client - continuing
    13:55:49.022 [main] INFO Client - second call
     --> Reconnected
    13:55:49.260 [main] INFO Client - CommonResourceCollectionResponse(id=61449f4b09394f7580cef144, type=TABLE, name=polygon, lockable=true)
    13:55:49.261 [main] INFO Client - Finished

    So it is working as expected. Is it a way to go or am I missing something?

    Greg Adams
    @gadams00
    When using spring-boot-starter-rsocket is there a way to send response payload metadata from a MessageMapping-annotated method?
    4 replies
    FlumeEzra
    @FlumeEzra
    Hey everyone!
        @Override
        public Flux<Payload> requestStream(Payload payload) {
            //first flux
            return Flux.interval(Duration.ofMillis(1000))
                    .map(time -> {
                        System.out.println("Creating...");
                       return DefaultPayload.create("Hello " + payload.getDataUtf8() + " @ " + Instant.now());
                    });
    
            //second flux
            return Flux.create(fluxSink -> {
                for (int i = 0; i < 10; i++) {
                    try {
                        System.out.println("Creating...");
                        Thread.sleep(5000);
                        fluxSink.next(DefaultPayload.create(String.format("Number %s", i)));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    how the first flux creates 1 element and sends it to the client right away while the second one generates all 10 elements and only then sends it to the client? also how would i go about doing something similar to the first flux? please help
    32 replies
    khizar khan
    @KhanKhankhel25_twitter

    when I write this

    this.requester.dispose();

    i get this exception

    reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disposed
    Caused by: java.util.concurrent.CancellationException: Disposed

    is there a way I can dispose connection on rsocket server.

    12 replies
    rajkumarbhaji
    @rajkumarbhaji
    HI RSocketREquester metadata to pass bearer token .metadata(token, BearerTokenMetadata.BEARER_AUTHENTICATION_MIME_TYPE), but BearerTokenMetadata.BEARER_AUTHENTICATION_MIME_TYPE is deprecated. What is the alternative we should use to pass Bearer token
    1 reply
    Tobsch
    @schneider:synyx.de
    [m]
    Hey, if I have a integration test based in RsocketRequester.builder and hook on the port via
    @BeforeAll public static void setupOnce(@Autowired RSocketRequester.Builder builder, @Value("${spring.rsocket.server.port}") Integer port) { requester = builder.transport(WebsocketClientTransport.create(port)); }
    how can I release the port after the test? .dispose() does not work and so other tests after this one cannot bind the port again.
    1 reply
    Tobsch
    @schneider:synyx.de
    [m]
    Is the way to create a RSocketServer for every testclass ?
    rajkumarbhaji
    @rajkumarbhaji
    How to use Sprngcloud gateway for RSocket microservices??
    Eric Turley
    @ericjturley
    I've set up rsocket metrics using rsocket-micrometer on the CLIENT side, by configuring the RSocketConnector with interceptors.
    But on the server side, I'm using an annotated (@MessageMapping) Spring Boot RSocket Controller.
    How do I add interceptors on the server side?
    rajkumarbhaji
    @rajkumarbhaji
    How to use Spring cloud API gateway for multiple RSocket servers??
    Eugene Kuleshov
    @ekuleshov

    Need an advise on using reactive resources (e.g. reactive http client, reactive Redis client, etc) in doFirst() and doFinally() behaviours (side-effects). E.g.

    rsocketRequester.rsocket().onClose()
      .doOnFirst(() -> {
         httpClient.get().uri(serviceUrl)
           ...
           .subscribe(); // <--- ***
      })
      ...
      .doFinally(() -> {
        httpClient.get().uri(serviceUrl)
           ...
           .subscribe(); // <--- ***
      })
      .subscribe();

    Basically because of *** these http calls is happening asynchronously and outside of rsocket lifecycle. Could it be rewritten without these spawned subscribes and also be bound to lifecycle? E.g. don't finis/dispose the rsocket subscribe() until http call is completed.

    Thanks

    2 replies
    Eugene Kuleshov
    @ekuleshov
    @OlegDokuka do you have any recommendations for some throttling proxy or alike test tools to allow to automate testing of rsocket connections with bad connectivity?
    1 reply
    FlumeEzra
    @FlumeEzra
    Guys, I need help. I've arrived to a point where there's just hopelessness. I'm trying to setup spring boot rsocket server with firebase authentication. I include the a firebase auth jwt in the setuppayload metadata and it gets verified by the firebase admin sdk - this part works so part, the connection is authenticated now so there's no need for further authentication, but i need to authorize requests and I'd like to do it with @PreAuthorize but coudln't find a way to do so. Please help me
    50 replies
    @Configuration
    @EnableRSocketSecurity
    @EnableReactiveMethodSecurity
    public class RSocketSecurityConfig {
    
        @Bean
        PayloadSocketAcceptorInterceptor authorization(RSocketSecurity security, FirebaseAuth firebaseAuth) {
            security.authorizePayload(authorize ->
                    authorize
                            .anyExchange().authenticated()
            ).jwt(jwtSpec -> {
                jwtSpec.authenticationManager(new FirebaseAuthenticationManager(firebaseAuth));
            });
    
            return security.build();
        }
    
    
        @Bean
        RSocketMessageHandler messageHandler(RSocketStrategies strategies) {
            RSocketMessageHandler handler = new RSocketMessageHandler();
            handler.getArgumentResolverConfigurer().addCustomResolver(new AuthenticationPrincipalArgumentResolver());
            handler.setRSocketStrategies(strategies);
            return handler;
        }
    }
    @PreAuthorize("hasRole('USER')")
    @MessageMapping("auth-test")
    public Mono<Test> testMono(Test msg, @AuthenticationPrincipal UserDetails userDetails) {
        log.info("Got message {}", msg.getMessage());
    
        return Mono.just("Well hello")
                .map(s -> Test.newBuilder()
                        .setMessage(s)
                        .build()
                );
    }
    and despite the following userdetailsservice:
    @Service
    public class FirebaseUserDetailsService implements AuthenticationUserDetailsService<FirebaseAuthenticationToken> {
    
        @Override
        public UserDetails loadUserDetails(FirebaseAuthenticationToken token) throws UsernameNotFoundException {
            log.info("loadUserDetails gets called, authorities: {}", token.getAuthorities());
    
            return User.builder()
                    .username(token.getName())
                    .authorities(token.getAuthorities())
                    .build();
        }
    
    }
    FlumeEzra
    @FlumeEzra
    spring will just throw Denied for auth-test request
    Devashish Mondal
    @debashish345

    Hello everyone, is there any code example available for dart client ( flutter ) and spring websocket server ? i'm getting error at connection setup stage while using @ Payload . Could anyone know what the issue ?

    Dart client code dart package rsocket-dart

      RSocket rsocket = await RSocketConnector.create()
          .metadataMimeType('message/x.rsocket.routing.v0')
          .dataMimeType('text/plain')
          .setupPayload(Payload.fromText('text/plain', 'test data'))
          .connect('ws://10.0.2.2:6565/rsocket');

    RSocket JAVA code

        @ConnectMapping
        public void onConnect(RSocketRequester requester, @Payload String payload) { ... /// }

    property file

    spring.rsocket.server.port=6565
    spring.rsocket.server.mapping-path=/rsocket
    spring.rsocket.server.transport=websocket
    2 replies
    Parth Mudgal
    @artpar
    how can i access the session metadata sent in the connect phase. I can access the request time metadata using the @Headers annotation, but unable to figure out how to get the session metadata (eg auth data is stored in the session metadata and that also i can access with the @AuthenticationPrincipal annotation, but what about other keys)
    3 replies
    Ghost
    @ghost~5f25c515d73408ce4feb1da3
    Does RSocket support fallback when websocket is not available ?
    7 replies
    Noor Khan
    @noorkhan-92
    how to stream audio/video using RSocket reactive model. I haven't found any tutorial or sample code for it. there are tutorials only for streaming text...?
    4 replies
    Ghost
    @ghost~5f25c515d73408ce4feb1da3
    Hi, any tool to test RSocket like postman ?
    2 replies
    Eugene Kuleshov
    @ekuleshov
    @OlegDokuka do you by any chance have any idea what does it take to enable compression in the RSocket over websocket in a spring boot server app?
    E.g. here is a ticket describing details
    spring-projects/spring-boot#29518
    iamceph
    @iamceph
    Hey, if anyone is interested, I made a Spring Boot starter for RPC implementation of RSocket. Still working on it and documentation is coming in few days :) https://github.com/iamceph/springed-rsocket
    Alexey Kobiakov
    @kobiakov

    Hello! I got quite stuck figuring out how to control the requestN value from my Spring Boot application.

    The context: I am using an rsocket channel. I would like to limit the number of messages per connection the backend receives at a time (those can be quite bulky).

    What I do:

    1. I try to set rateLimit in my @MessageMapping handler
    2. I try to use the rsocket server rate-limiting interceptors

    Nevertheless, the very first requestN I see in the logs looks like this:

    ` 2022-03-16 17:23:44.498 DEBUG 2265 --- [ctor-http-nio-7] io.rsocket.FrameLogger : sending -> Frame => Stream ID: 3 Type: REQUEST_N Flags: 0b0 Length: 10 RequestN: 31

    I have no idea where does 31 come from!
    Any hints much appreciated!

    1 reply
    jayground8
    @jayground8
    Hello! I hope here is right place to ask a question about the protocol status. I really like an idea of Rsocket and its motivation. I wonder if it is under active development for 1.0 release. Rsocket protocol Github repo doesn't seem very active.
    JasonWong
    @medusar
    Hello! Is aeron still supported in rsocket ? It seems this project: rsocket-transport-aeron is archived.
    iamceph
    @iamceph
    Did anyone here tried to compile rsocket-rpc-java by themselves? I am getting some weird errors sadly..
    Basically Linker is failing on me - https://paste.gg/p/anonymous/bf3c07deb4f84dd8aba117c2ac81e4fd
    • Protobuf is installed
    1 reply
    pramodanarase
    @pramodanarase
    Hi all, I am new to reactive world, What is the recommended way for bi-directional communication from server and client. I tried using the Client Responder (https://spring.io/blog/2020/05/12/getting-started-with-rsocket-servers-calling-clients) that not working working with our gateway. Can i use flux or mono over single channel? is there any way i can open (mono or flux ) for infinite time and do action as data received? TIA.
    FlumeEzra
    @FlumeEzra
    Hi all, how can I setup TLS for spring rsocket?
    I already configured TLS in my application.properties.... do I need to do anything else?
    FlumeEzra
    @FlumeEzra
    Oh and another important thing to mention... I'm using websocket transport (server side)
    KamiWan
    @KaimingWan
    1 reply
    Need some help
    Ranganath Nadella
    @Exnadella
    Can we implement Rsocket with Redis Database using Java or JavaScript Client.
    4 replies
    Nickolas Heckman
    @nrheckman
    Looking for some direction/guidance/documentation for exposing micrometer metrics computed from spring-boot-starter-rsocket @MessageMapping("...") rsocket channels. Is this a thing that exists? Or do I need to record and publish metrics myself? Or maybe this is the wrong place to ask?
    7 replies
    bruce
    @sdack-cloud
    How clients listen to server disconnection events
    clientRSocket = RSocketConnector.create()
                .metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.string)
                .dataMimeType(WellKnownMimeType.APPLICATION_JSON.string)
                .setupPayload(DefaultPayload.create("ttest"))
                .reconnect(Retry.fixedDelay(36, Duration.ofSeconds(10)))
                .connect(transport)
                .doOnSuccess {
                    Log.e("TAG", "Success")
                }
                .doOnCancel {
                    Log.e("TAG", "Cancel")
                }
                .doOnError {
                    Log.e("TAG", "Error")
                }.doFinally {
                    Log.e("TAG", "Finally")
                }.block()
    Chris Jones
    @CJMobileApps
    Hey I've asked a question on stackflow about an access denied error with RSocket security JWTs https://stackoverflow.com/questions/73366875/spring-rsocket-security-jwt-access-denied-error
    Any help?