Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    alwaystheobvious
    @alwaystheobvious
    (edit) upgrading to the latest release solved the problem
    alwaystheobvious
    @alwaystheobvious
    Hi!, Should I release ConnectionSetupPayload ever? Currently, I'm releasing it:
    3 replies
    @Override
    public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
        sendingSocket
                .onClose()
                .doOnError(Throwable::printStackTrace)
                .doFinally(consumer -> {
                    setup.release();
                    System.out.println("Released setup");
                })
                .subscribe();
    
        return Mono.just(sendingSocket);
    }
    But it blocks the thread? Released setup never gets printed out.
    1 reply
    Dmitri Maximovich
    @maximdim_twitter

    Hi everybody. I'm strugging to understand how the rsocket reconnect feature is supposed to work.

    Here is my code, using Spring Boot 2.4.9.

    Client:

    public Client(RSocketRequester.Builder builder, RSocketStrategies strategies) {
      HttpClient httpClient = HttpClient.create().baseUrl(backendUrl);
    
      requester = builder
         .setupRoute("/backend/connect")
         .setupData(id)
         .rsocketConnector(connector -> connector.reconnect(Retry.fixedDelay(Integer.MAX_VALUE, Duration.ofSeconds(10))))
         .dataMimeType(MediaType.APPLICATION_JSON)
         .transport(WebsocketClientTransport.create(httpClient, "/rsocket"));
    
      Flux.interval(Duration.ofSeconds(60))
          .map(tick -> "Telemetry from " + id + " tick " + tick)
          .flatMap(msg -> requester.route("/backend/telemetry").data(msg).retrieveMono(Void.class))
          .retry()
          .subscribe();
    }

    Server:

    @ConnectMapping("/backend/connect")
    public Mono<Void> connect(RSocketRequester requester, @Payload String id) {
      requester.rsocket()
        .onClose()
        .doFirst(() -> {
          logger.info("Connected {}", id);
        })
        .doOnError(error -> logger.warn("Error {}: {}" + id, error.getMessage()))
        .doFinally(consumer -> {
          logger.info("Disconnected {}", id);
        })
        .subscribe();
    
      return Mono.empty();
    }
    
    @MessageMapping("/backend/telemetry")
      public Mono<Void> telemetry(String message) {
            logger.info("Telemetry received: " + message);
            return Mono.empty();
    }

    Now client connects to server fine and I'm trying to simulate the network failure between them using socat command:

    socat -d TCP-LISTEN:9897,fork,reuseaddr TCP:172.17.24.149:9898

    When connection is interrupted (by terminating socat command) and then re-established it takes client on average about 30 seconds to reconnect, which seems to correlate with Flux.interval(60) in the client that calls the telementry endpoint. It feels like the connection is not established or re-established unless there is a call from the client to the server - is it how it supposed to work? For example, if I comment out Flux.interval() on a client the server doesn't event log 'Connected' message. Also changing the connector.reconnect() parameters doesn't seem to change the reconnection speed.

    I hope I'm doing something wrong here - would appreciate your feedback.

    Thanks!

    5 replies
    alwaystheobvious
    @alwaystheobvious
    Hi team!
        clientRSocket
                .requestStream(DefaultPayload.create("Hello from request1"))
                .doOnNext(payload -> {
                    System.out.println(payload.getDataUtf8());
                    try {
                        Thread.sleep(30000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                })
                .subscribe();
    
    
    
        clientRSocket
                .requestStream(DefaultPayload.create("Hello from request2"))
                .doOnNext(payload -> {
                    System.out.println(payload.getDataUtf8());
                })
                .subscribe();
    How can I receive responses to the 2nd request while waiting for request 1 to get processed?
    alwaystheobvious
    @alwaystheobvious
    With thread.sleep(30000) I'm imitating the download of big binary data.
    If the user sends a second request which is now became the primary request that needs to be fulfilled, I pause the first request and focus on the second 1
    But the server won't send anything to the client until it completed the 1st request.
    alwaystheobvious
    @alwaystheobvious
    (Edit) just had to use publishOn() operator on requestStream flux
    Violeta Georgieva
    @violetagg
    @alwaystheobvious When you have blocking or high CPU consumption operation you may consider doing this https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking
    alwaystheobvious
    @alwaystheobvious
    Ohh, thank you. @violetagg
    Is there a built-in way to "treat" the first element of a flux differently than the rest? For example, I always send the text data first because it lets me set up many things fast, and only then start sending the binary data.
    alwaystheobvious
    @alwaystheobvious
        Flux.just(textData, binaryData, binaryData, binaryData, binaryData)
                .doOnNext(incomingData -> {
    
                    if (incomingData.isText()) {
                       //do this  
                    } else {
                        //do that 
                    }
    
                })
                .subscribe();
    I was thinking about nesting a mono - text and a flux - binary request, but i don't think that's too efficient
    alwaystheobvious
    @alwaystheobvious
    Maybe something like this:
        Flux.just(textData, binaryData, binaryData, binaryData, binaryData)
                .doOnFirst(textData -> {
                    //work with text data
                })
                .doOnNext(binaryData -> {
                    //work with binary data
                })
                .subscribe();
    alwaystheobvious
    @alwaystheobvious
    So in the end, after a bunch of different solutions, I've come to this one that I think is good enough:
        Flux<Payload> flux = clientRSocket
                .requestStream(DefaultPayload.create("Hello from request1"))
                .doOnComplete(() -> System.out.println("Done"))
                .share();
    
    
    
        flux.next()
                .subscribe(p -> {
                    System.out.println(p.getDataUtf8() + "  text");
                });
    
    
        flux.skip(1)
                .doOnNext(p -> {
                    System.out.println(p.getDataUtf8() + "  binary");
                }).subscribe();
    3 replies
    I appreciate any review on it :)
    Tobsch
    @schneider:synyx.de
    [m]
    Hey, we have a rsocket-js client and a rsocket-java webflux backend and rscoket is based on websocket. Our problem is that spring security will remove the websession after 30 minutes after inactivity via http. The websocket is secured via spring security and my hope was that the ping/pong aka keepalive from rsocket does trigger the websession updateLastAccessTimemethod. But it does not. Is there a way to tell spring security to listen to the rsocket keepalive?
    2 replies
    We send the jwt on the setup frame
    Leuteris Katiforis
    @typosevery:matrix.org
    [m]

    Given the following responder:

    @MessageMapping("submit")
        Mono<SubmitResponse> submit(Request request) {
            return service.submit(request, context)
                    .then(Mono.<SubmitResponse>just(SubmitResponse.builder()
                            .result(ApiResponse.Result.SUCCESS)
                            .build()))
                .log();
        }

    Could you explain why it logs a cancel() signal:

    2021-08-31 15:37:04,536 INFO reactor-http-epoll-5 r.M.I.1 - request(1)
    2021-08-31 15:37:04,669 INFO reactor-http-epoll-4 r.M.I.1 - onNext((SUCCESS))
    2021-08-31 15:37:04,675 INFO reactor-http-epoll-4 r.M.I.1 - cancel()
    2021-08-31 15:37:04,679 INFO reactor-http-epoll-4 r.M.I.1 - request(9223372036854775806)
    2021-08-31 15:37:04,680 INFO reactor-http-epoll-4 r.M.I.1 - onComplete()

    5 replies
    Bartłomiej Piech
    @delor

    given the code in example

    fun main(args: Array<String>) {
        val client: RSocket = RSocketConnector.connectWith(TcpClientTransport.create(9898)).block()!!
        client.requestStream(DefaultPayload.create("{\"sessionId\":\"A\",\"connectionId\":\"${UUID.randomUUID()}\"}"))
            .take(10)
            .doOnNext { println(it.dataUtf8) }
            .subscribe()
        System.`in`.read()
        client.dispose()
    }

    how do I specify route?

    khizar khan
    @KhanKhankhel25_twitter
    @KhanKhankhel25_twitter i have this code from spring boot article rsocket: server calling clients. here the store all the connected clients. what I wanted ask is if the server knows about the connected clients then there will also be a way to send client1 message to client2. can anyone give me pointer on that. thanks.
    @ConnectMapping("shell-client")
        void connectShellClientAndAskForTelemetry(RSocketRequester requester,
                                                  @Payload String client) {
    
            requester.rsocket()
                    .onClose()
                    .doFirst(() -> {
                        // Add all new clients to a client list
                        log.info("Client: {} CONNECTED.", client);
                        CLIENTS.add(requester);
                    })
                    .doOnError(error -> {
                        // Warn when channels are closed by clients
                        log.warn("Channel to client {} CLOSED", client);
                    })
                    .doFinally(consumer -> {
                        // Remove disconnected clients from the client list
                        CLIENTS.remove(requester);
                        log.info("Client {} DISCONNECTED", client);
                    })
                    .subscribe();
    
            // Callback to client, confirming connection
            requester.route("client-status")
                    .data("OPEN")
                    .retrieveFlux(String.class)
                    .doOnNext(s -> log.info("Client: {} Free Memory: {}.", client, s))
                    .subscribe();
        }
    khizar khan
    @KhanKhankhel25_twitter

    Hey guys please help. i have Rsocket server and client application. the Client application has RsocketStrategies with custom mimetype defined. below is the bean.

    @Bean
        public RSocketStrategies rSocketStrategies() {
            return RSocketStrategies.builder()
                    .encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
                    .decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
                    .metadataExtractorRegistry(metadataExtractorRegistry -> {
                        metadataExtractorRegistry.metadataToExtract(MimeType.valueOf(Constants.MIME_FILE_EXTENSION), String.class, Constants.FILE_EXTN);
                        metadataExtractorRegistry.metadataToExtract(MimeType.valueOf(Constants.MIME_FILE_NAME), String.class, Constants.FILE_NAME);
                    })
                    .build();
        }

    and in Client application I have another line of code for routing to server end point

    rSocketRequester.route("fileupload")
                            .metadata(metadataSpec -> {
                                metadataSpec.metadata("pdf", MimeType.valueOf(Constants.MIME_FILE_EXTENSION));
                                metadataSpec.metadata("output", MimeType.valueOf(Constants.MIME_FILE_NAME));
                            })
                        .data(readFlux)
                    .retrieveFlux(Status.class)
                    .doOnNext(s -> System.out.println("Upload Status : " + s))
                    .subscribe();

    i have given the mimetype for file to be uploaded to the server. This is my server side code i have used System.out.println to see if the value is extracted.

     @MessageMapping("fileupload")
        public Flux<Status> upload(@Headers Map<String, Object> metadata, @Payload Flux<DataBuffer> content) throws IOException {
            System.out.println("file upload controller");
    
            Set<String> keys = metadata.keySet();
            for (String key: keys) {
                System.out.println("key : " + key + " value : " + metadata.get(key));
            }
    
            var fileName = metadata.get(Constants.FILE_NAME);
            var fileExtn = metadata.get(Constants.FILE_EXTN);
    
            System.out.println(fileName);
            System.out.println(fileExtn);
    
            var path = Paths.get(fileName + "." + fileExtn);
            System.out.println("Path :  " + path);
    
            return Flux.concat(service.uploadFile(path, content), Mono.just(Status.COMPLETED))
                        .onErrorReturn(Status.FAILED);
    
        }

    the problem is the variable storing file type and extension are null. please help.
    this is the server side run time output.

    key : dataBufferFactory value : NettyDataBufferFactory (PooledByteBufAllocator(directByDefault: true))
    key : rsocketRequester value : org.springframework.messaging.rsocket.DefaultRSocketRequester@4516b4f1
    key : rsocketResponse value : null
    key : lookupDestination value : fileupload
    key : contentType value : application/cbor
    key : rsocketFrameType value : REQUEST_CHANNEL
    null
    null
    Path : null.null

    khizar khan
    @KhanKhankhel25_twitter

    @OlegDokuka help please. i have rsocket client and rsocket server. the client is sending file to the server. in rsocket conifguration on client in rsocket strategies i have defined it but its not working here is the class.

    @Configuration
    public class RSocketConfig {
    
        @Bean
        public RSocketStrategies rSocketStrategies() {
            return RSocketStrategies.builder()
                    .encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
                    .decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
                    .metadataExtractorRegistry(metadataExtractorRegistry -> {
                        metadataExtractorRegistry.metadataToExtract(MimeType.valueOf(Constants.MIME_FILE_EXTENSION), String.class, Constants.FILE_EXTN);
                        metadataExtractorRegistry.metadataToExtract(MimeType.valueOf(Constants.MIME_FILE_NAME), String.class, Constants.FILE_NAME);
                    })
                    .build();
        }
    
        @Bean
        RSocketRequester rSocketRequester(RSocketRequester.Builder builder){
            return
                    builder
                            .rsocketConnector(rSocketConnector ->
                                    rSocketConnector.reconnect(Retry.backoff(10, Duration.ofMillis(10))))
                            .tcp("localhost", 6565);
        }
    }

    and here is the server side code. i ran all keys on the metadata map instance but nothing was found. please help here is the code.

    @MessageMapping("fileupload")
        public Flux<Status> upload(@Headers Map<String, Object> metadata, @Payload Flux<DataBuffer> content) throws IOException {
            System.out.println("file upload controller");
    
            Set<String> keys = metadata.keySet();
            for (String key: keys) {
                System.out.println("key : " + key + " value : " + metadata.get(key));
            }
    
            var fileName = metadata.get(Constants.FILE_NAME);
            var fileExtn = metadata.get(Constants.FILE_EXTN);
    
            System.out.println(fileName);
            System.out.println(fileExtn);
    
            var path = Paths.get(fileName + "." + fileExtn);
            System.out.println("Path :  " + path);
    
            return Flux.concat(service.uploadFile(path, content), Mono.just(Status.COMPLETED))
                        .onErrorReturn(Status.FAILED)
                        .doOnComplete(() -> {
                            System.out.println("Upload Successfull.");
                        });
    
        }
    khizar khan
    @KhanKhankhel25_twitter
    can i have any review on the above question.
    4 replies
    khizar khan
    @KhanKhankhel25_twitter

    please review my code i have this code on server side

    @MessageMapping("ReqStream")
        public Flux<DataBuffer> stream(Mono<String> nameMono) throws IOException {
            if (resource.exists()) System.out.println("true");
    
            Flux<DataBuffer> dataBuffer = DataBufferUtils.read(resource, new DefaultDataBufferFactory(), 1048576)
                    .doOnNext(s -> System.out.println("Sent"));
            return dataBuffer;
        }

    the above code is sending a file as Flux<DataBuffer>. while on the client side.

    rSocketRequester.route("ReqStream")
                    .retrieveFlux(DataBuffer.class)
                    .doOnNext(dataBuffer  -> {
                        try {
                            service.uploadFile(path, dataBuffer);
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    })
                    .doOnComplete(() -> System.out.println("Upload Complete"))
                    .subscribe();

    the above code is routing to server and requesting stream which sends a file in bytes but the thing is the file created on client side shows 0 bytes. please help

    22 replies
    alwaystheobvious
    @alwaystheobvious
    Hi all
         @Override
         public Mono<Payload> requestResponse(Payload payload) {
            Mono<String> mono = Mono.fromCallable(() -> "Hello from server").subscribeOn(Schedulers.boundedElastic());
            String response = mono.block();
            System.out.println(response);
            return Mono.just(DefaultPayload.create(response));
         }
    14 replies
    Why won't mono.block() work on server?
    mono.subscribe works but the blocking call won't...
    alwaystheobvious
    @alwaystheobvious
    since blocking is not advised but i'd still like to read the image files from other thread and not main i came up with:
         Flux<ImageMessage> imageMessageFlux(List<String> imagePath) {
             return Flux
                     .fromIterable(imagePath)
                     .subscribeOn(Schedulers.boundedElastic())
                     .map(handler::readImageFromDisk)
                     .map(handler::createMessage);
         }
    on the reactor netty channel i was told that maybe i could stream this whole thing rather than collecting everything in memory
    anyone could help me that? i mainly use requestStream()
    a very basic solution with Flux<Payload>, String would do it : )
    if it's possible ... ofc
    Bazzinga
    @Vadim_S_1_twitter
    Hi all. I can not find information on how to configure tls in spring-boot-starter-rsocket. I find some examples but it not spring boot https://github.com/reactor/reactor-netty/blob/main/reactor-netty-examples/src/main/java/reactor/netty/examples/http/helloworld/HelloWorldServer.java
    5 replies
    Pavel Grigorenko
    @MastaP

    Hi, we have a use case when the server needs to reject a connecting RSocket client if it has the same ID as an already existing (connected) client.
    So far we’ve used @ConnectMapping to register clients and this method also returns Mono.error(RejectedSetupException()) in case of the duplicate.
    Now, on the client side, the rejected client receives two exceptions:

    ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
    reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disposed
    Caused by: java.util.concurrent.CancellationException: Disposed
        at io.rsocket.internal.UnboundedProcessor.dispose(UnboundedProcessor.java:545)
        at io.rsocket.transport.netty.WebsocketDuplexConnection.doOnClose(WebsocketDuplexConnection.java:72)
        at io.rsocket.internal.BaseDuplexConnection.lambda$new$0(BaseDuplexConnection.java:30)
    ...
        at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46)
        at io.rsocket.internal.BaseDuplexConnection.dispose(BaseDuplexConnection.java:51)

    and

    reactor.core.Exceptions$ErrorCallbackNotImplemented: RejectedSetupException (0x3): MPC Gateway with ID 'mpc1' already connected
    Caused by: io.rsocket.exceptions.RejectedSetupException: MPC Gateway with ID 'mpc1' already connected
        at io.rsocket.exceptions.Exceptions.from(Exceptions.java:62)
        at io.rsocket.core.RSocketRequester.lambda$tryTerminateOnZeroError$4(RSocketRequester.java:313)
        at io.rsocket.core.RSocketRequester.tryTerminate(RSocketRequester.java:318)
        at io.rsocket.core.RSocketRequester.tryTerminateOnZeroError(RSocketRequester.java:313)
        at io.rsocket.core.RSocketRequester.handleStreamZero(RSocketRequester.java:224)
        at io.rsocket.core.RSocketRequester.handleIncomingFrames(RSocketRequester.java:209)
        at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
        at io.rsocket.core.ClientServerInputMultiplexer$InternalDuplexConnection.onNext(ClientServerInputMultiplexer.java:248)

    Both are ErrorCallbackNotImplemented.
    Question: which is the correct place to hook into in order to handle these errors? Maybe we’re missing something basic here.
    Another question is, should we look into interceptors instead of @ConnectMapping for the use case above?

    11 replies
    khizar khan
    @KhanKhankhel25_twitter
    @OlegDokuka is there way i can connect two clients. like one client sends request to the server to connect to this client the server is keeping track of all the connect clients. the server call the client and connect client1 with client2. the data from client1 to client2 passing through server bi-directionaly. any pointers can help. Thanks.
    3 replies
    khizar khan
    @KhanKhankhel25_twitter
    hey can i create a bi-directional channel between two clients through server.
    khizar khan
    @KhanKhankhel25_twitter
    what is this error
    reactor.core.Exceptions$ErrorCallbackNotImplemented: ApplicationErrorException (0x201): Destination 'channel' does not support REQUEST_STREAM. Supported interaction(s): [REQUEST_CHANNEL]
    4 replies
    khizar khan
    @KhanKhankhel25_twitter

    please suggest a change in the code. the code is working as i want it to be. their are two
    rsocket client developed in spring java rsocket and one spring rsocket server. The flow is the server is running and storing connected clients in map using

    private final List<RSocketRequester> CLIENTS = new ArrayList<>();
    private final Map<String, RSocketRequester> clientsMap = new HashMap<>();
    
    @ConnectMapping("shell-client")
        void connectShellClientAndAskForTelemetry(RSocketRequester requester,
                                                  @Payload String client) {
    
            requester.rsocket()
                    .onClose()
                    .doFirst(() -> {
                        // Add all new clients to a client list
                        log.info("Client: {} CONNECTED.", client);
                        CLIENTS.add(requester);
                        clientsMap.put(client, requester);
                    })
                    .doOnError(error -> {
                        // Warn when channels are closed by clients
                        log.warn("Channel to client {} CLOSED", client);
                    })
                    .doFinally(consumer -> {
                        // Remove disconnected clients from the client list
                        CLIENTS.remove(requester);
                        clientsMap.remove(client);
                        log.info("Client {} DISCONNECTED", client);
                    })
                    .subscribe();
    }

    both the client send message to each other using server. server code is

    @MessageMapping("channel")
        Flux<Message> channel(@Payload Flux<Message> messageFlux) {
            log.info("Received channel request...");
    
    
            return messageFlux
                        .doOnNext(message1 -> System.out.println("Sender : " + message1.getSender() +
                                " Receiver : " + message1.getReceiver() +
                                " Content : " + message1.getContent()))
                        .switchMap(message1 ->
                            clientsMap.get(message1.getReceiver())
                                    .route("channel")
                                    .data(Flux.just(message1))
                                    .retrieveFlux(Message.class)
                        )
                        .log();  
        }

    and this is the message Class

    public class Message {
    
        private String sender;
        private String receiver;
        private String content;
    
        public Message(String sender, String receiver, String content){
            this.sender = sender;
            this.receiver = receiver;
            this.content = content;
        }
    }

    i want suggestion on my server code. the code is simulating single voice call.

    Jan Tobola
    @jantobola

    Hi guys, can someone help me to understand how reconnect feature works? I have one scenario which does not work as I would expect.
    I'm using Spring's RSocketRequester as a client. Consider the following code:

    val requester = RSocketRequester.builder()
        .rsocketStrategies {
            it.decoder(Jackson2JsonDecoder())
            it.encoder(Jackson2JsonEncoder())
        }
        .rsocketConnector {
            it.reconnect(Retry.max(3).doBeforeRetry {
                runBlocking {
                    delay(Duration.seconds(1))
                    log.warn { "Unable to connect to Elliot Manager. Retrying... (${it.totalRetries()})" }
                }
            })
        }
        .dataMimeType(MediaType.APPLICATION_JSON)
        .setupData(ElliotSetupPayload("client-1"))
        .websocket(URI.create("ws://localhost:5000"))
    
    // first call
    log.info { "first call" }
    try {
        val collection = requester
            .route("v1.resourceCollections.byName({name})", "polygon")
            .retrieveMono(CommonResourceCollectionResponse::class.java)
            .block()
        log.info { collection.toString() }
    } catch (e: Exception) {
    
    }
    
    // second call
    log.info { "second call" }
    val collection2 = requester
        .route("v1.resourceCollections.byName({name})", "polygon")
        .retrieveMono(CommonResourceCollectionResponse::class.java)
        .block()
    log.info { collection2.toString() }

    When I run this AND server is down, the result is following:

    12:41:19.247 [main] INFO Client - first call
    12:41:20.586 [reactor-tcp-epoll-2] WARN Client - Unable to connect to Elliot Manager. Retrying... (0)
    12:41:21.600 [reactor-tcp-epoll-2] WARN Client - Unable to connect to Elliot Manager. Retrying... (1)
    12:41:22.605 [reactor-tcp-epoll-2] WARN Client - Unable to connect to Elliot Manager. Retrying... (2)
    12:41:22.608 [main] INFO Client - second call
    Exception in thread "main" reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3
        at reactor.core.Exceptions.retryExhausted(Exceptions.java:290)
        at reactor.util.retry.RetrySpec.lambda$static$2(RetrySpec.java:61)
        ...
        Suppressed: java.lang.Exception: #block terminated with an error
            at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
            at ClientKt.main(Client.kt:149)
        Suppressed: java.lang.Exception: #block terminated with an error
            at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
            at ClientKt.main(Client.kt:160)
    Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: finishConnect(..) failed: Connection refused: localhost/127.0.0.1:5000
    Caused by: java.net.ConnectException: finishConnect(..) failed: Connection refused
        at io.netty.channel.unix.Errors.newConnectException0(Errors.java:155)
        at io.netty.channel.unix.Errors.handleConnectErrno(Errors.java:128)
        at io.netty.channel.unix.Socket.finishConnect(Socket.java:320)
        at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.doFinishConnect(AbstractEpollChannel.java:710)
            ...
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:834)

    Why the client does not try to reconnect 3 times during the second call? And how would you do this in order to behave like this?
    Use the same connection when the connection is open (so both calls use the same session) and try to reconnect (not indefinitely) on every call when the connection is not opened or could not be established.

    2 replies
    Jan Tobola
    @jantobola

    What about infinite reconnect with a combination of kotlin coroutines? Let me demonstrate...

    private fun <T> timeoutCall(timeout: Duration = Duration.seconds(3), call: suspend () -> T?): T? = runBlocking {
        try {
            withTimeout(timeout) {
                call()
            }
        } catch (e: TimeoutCancellationException) {
            log.warn { "RSocket call timed out." }
            null
        }
    }
    
    val requester = RSocketRequester.builder()
        .rsocketStrategies {
            it.decoder(Jackson2JsonDecoder())
            it.encoder(Jackson2JsonEncoder())
        }
        .rsocketConnector {
            it.reconnect(Retry.indefinitely().doBeforeRetry {
                runBlocking {
                    delay(Duration.seconds(1))
                    log.debug { "Unable to connect to Elliot Manager. Retrying... (${it.totalRetries()})" }
                }
            })
        }
        .dataMimeType(MediaType.APPLICATION_JSON)
        .setupData(ElliotSetupPayload("client-1"))
        .websocket(URI.create("ws://localhost:5000"))
    
    log.info { "first call" }
    timeoutCall {
        requester
            .route("v1.resourceCollections.byName({name})", "polygon")
            .retrieveAndAwaitOrNull<CommonResourceCollectionResponse>()
    }?.let {
        log.info { it.toString() }
    }
    
    log.info { "little pause" }
    runBlocking { delay(10000) }
    log.info { "continuing" }
    
    // second call
    log.info { "second call" }
    timeoutCall {
        requester
            .route("v1.resourceCollections.byName({name})", "polygon")
            .retrieveAndAwaitOrNull<CommonResourceCollectionResponse>()
    }?.let {
        log.info { it.toString() }
    }
    
    log.info { "Finished" }

    This will produce:

     --> Server not running
    13:55:35.964 [main] INFO Client - first call
    13:55:39.017 [main] WARN Client - RSocket call timed out.
    13:55:39.018 [main] INFO Client - little pause
     --> Server started
    13:55:49.021 [main] INFO Client - continuing
    13:55:49.022 [main] INFO Client - second call
     --> Reconnected
    13:55:49.260 [main] INFO Client - CommonResourceCollectionResponse(id=61449f4b09394f7580cef144, type=TABLE, name=polygon, lockable=true)
    13:55:49.261 [main] INFO Client - Finished

    So it is working as expected. Is it a way to go or am I missing something?

    Greg Adams
    @gadams00
    When using spring-boot-starter-rsocket is there a way to send response payload metadata from a MessageMapping-annotated method?