by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Jan 31 2019 18:27

    robertroeser on 1.0.x

    Adds specific requestChannel(Pa… (compare)

  • Jan 31 2019 18:27
    robertroeser closed #572
  • Jan 31 2019 18:26
    robertroeser assigned #574
  • Jan 31 2019 18:26
    robertroeser opened #574
  • Jan 31 2019 18:19
    OlegDokuka assigned #573
  • Jan 31 2019 18:19
    OlegDokuka commented #573
  • Jan 31 2019 17:55
    robertroeser commented #573
  • Jan 31 2019 14:40
    nebhale commented #573
  • Jan 31 2019 06:43
    OlegDokuka commented #573
  • Jan 31 2019 01:32
    robertroeser synchronize #572
  • Jan 31 2019 01:32

    robertroeser on requestChannelImprovements

    call requestChannelWith 2 argum… (compare)

  • Jan 31 2019 01:32
    robertroeser synchronize #572
  • Jan 31 2019 01:32

    robertroeser on requestChannelImprovements

    call requestChannelWith 2 argum… (compare)

  • Jan 31 2019 01:24
    robertroeser opened #573
  • Jan 30 2019 00:00
    robertroeser synchronize #572
  • Jan 30 2019 00:00

    robertroeser on requestChannelImprovements

    updates Signed-off-by: Robert … (compare)

  • Jan 29 2019 23:59
    robertroeser synchronize #572
  • Jan 29 2019 23:59

    robertroeser on requestChannelImprovements

    updates (compare)

  • Jan 28 2019 19:33
    robertroeser synchronize #572
  • Jan 28 2019 19:33

    robertroeser on requestChannelImprovements

    updated javadoc Signed-off-by:… (compare)

Oleh Dokuka
@OlegDokuka
@angusnb_twitter Rsocket-RCP is the same rsocket but provides similar RPC fature as gRPC does
so the only difference is that rosocket-rpc-java is implementation of RPC similar of gRPC on top of RSocket protocol using rsocket java
Angus Burton
@angusnb_twitter
I see
so the projects are similar but aren't related – meaning rocket-rpc doesn't have a dependency on rsocket-java
Robert Roeser
@robertroeser
rsocket-rpc uses rsocket as a transport
the generated code is a very thing wrapper on top of RSocket - so RSocket rpc java generates code that uses rsocket-java etc
Josh Fix
@joshfix
can anybody advise a good reconnection strategy? i have one dockerized service accepting rsocket connections, and another service that establishes a connection. if the service that accepts connections goes down or is restarted, the other service hangs will simply hang on any outgoing requests. it’s as if it doesn’t know the connection has been broken. the doOnError and doOnTerminate operations never get called.
i’ve also called the keepAlive method with tickPeriod, etc values, but it doesn’t seem to have an effect. my only recourse is to restart the service and let it re-establish a connection to the accepting service
Josh Fix
@joshfix
the client side doesn’t seem to know that the connection has ever been broken
Josh Fix
@joshfix
fyi the reonnect issue is being addressed here: rsocket/rsocket-java#540
Menkir
@Menkir
I read in the motivations that RSocket supports two types of Flow Control. But how do i know which of these type im currently using? Is there a way to configure one of these?
Simon Baslé
@simonbasle
it is not an either/or situation
it supports application-level backpressure, driven by the request in the "client", and flow control for servers that are aware of their load and their siblings'
Menkir
@Menkir
@simonbasle Ah, so it doenst matter when im using Overflow-Strategies for my Fluxes? So RSocket takes care of my overhead?
Simon Baslé
@simonbasle
instead of using overflow strategies, you'd try to fine tune the request amount and let it propagate through the network to the server
there you can implement overflow strategies, or lazy loading of data, or whatever
Menkir
@Menkir
guess i got it. thy :thumbsup:
Jared Dellitt
@jareddellitt
Any chance of an RC3 of 0.12.2 being published soon?
Nuwan Sanjeewa Abeysiriwardana
@nuwansa
Mono.just(true).subscribeOn(Schedulers.elastic()).flatMap(b->rsocket.requestResponse(...)).doOnNext(p->System.out.println(Thread.currentThread().getName()).subscrbie();
this just print current thread as reactor-tcp-nio-* .is that expecting behaviour?
my understanding is doOnNext triggered on the elastic thread , not on the reactor-tcp thread
Oleh Dokuka
@OlegDokuka
@nuwansa subscribeOn means that generation of your source will be done on specified elastic thread. But it is clear that response from your request Response from RSocket will be on the different thread.
Thus it is fine
Nuwan Sanjeewa Abeysiriwardana
@nuwansa
thanks @OlegDokuka , is it same for the publishOn?
Oleh Dokuka
@OlegDokuka
Yes
In this particular case the behavior will be identical
Nuwan Sanjeewa Abeysiriwardana
@nuwansa
thanks @OlegDokuka
Sampietro Martin
@msampietro
Hi there! I am making some POC using Rsocket
I would like to know if it is possible to talk from server to a specific connected client (or pool of clients) through requestChannel.
It seems that the RSocketAcceptor always expects a request to communicate
Is it a good choice for chat/notification apps ?
Pavel
@Shpall_gitlab
Hello!
I'm trying to use RSocketRPC in some of our microservices communication. I started with this great description: https://github.com/rsocket/rsocket-rpc-java and described some of our DTOs with proto-files. Communication between services started well. Next question - is there any good way to return entities with webflux?
For example i described InternalService with InternalRequest and InternalResponse via proto-files. Service A calls service B with InternalRequest and recieves InternalResponse as Mono<InternalResponse>. The problem is that InternalResponse is auto-generated with a lot of internal fields and there is a problem to return them with webflux.
Pavel
@Shpall_gitlab
My goal is to return Mono<InternalResponse> as DTO when someone calls service B over RESTapi
Pavel
@Shpall_gitlab

I found the solution.
Post it here if someone needs.

I was mistaken in understanding how webflux handles protobuf. By default webflux send binary data with Content-Type: application/x-protobuf. So for REST-api i added module for Jackson as described here: https://github.com/venth/training-webflux-grpc#serializarion--deserialization-jackson-with-protobuf and produces = MediaType.APPLICATION_JSON_VALUE to @RequestMapping of endpoint

Pavel
@Shpall_gitlab
Hello again
I have generated classes with rsocket-rpc and specified them package inside my spring-boot-app. So i got an exception like Cannot subclass final class because generated server class is final. What should i do?
Pavel
@Shpall_gitlab
Is there anybody here?
Karthikeyan Palanivelu
@pckeyan

Hi Team, I am trying to use LoadBalancedRSocketMono as below:

public static void main(String[] args) {

        List<RSocketSupplier> rsocketSuppliers = Arrays.stream(PORTS)
                .mapToObj(port -> new RSocketSupplier(() -> RSocketFactory.connect()
                        .transport(TcpClientTransport.create("localhost", port))
                        .start()))
                .collect(Collectors.toList());

        LoadBalancedRSocketMono balancer = LoadBalancedRSocketMono
                .create((Publisher<Collection<RSocketSupplier>>) s -> {
                    s.onNext(rsocketSuppliers);
                    s.onComplete();
                    s.onComplete();
                });

        Flux.range(0, 10)
                .flatMap(i -> balancer)
                .doOnNext(rSocket -> rSocket.requestResponse(DefaultPayload.create("test-request")).blockOptional())
                .blockLast();

    }

I am getting the below exception as I have 3 suppliers but 0.0 available sockets. Can you please let me know what I am doing wrong?

16:07:44.214 [main] DEBUG reactor.netty.resources.PooledConnectionProvider - Creating new client pool [tcp] for localhost:7000
16:07:44.376 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 52021 (auto-detected)
16:07:44.420 [main] DEBUG io.rsocket.client.RSocketSupplierPool - Added io.rsocket.client.filter.RSocketSupplier@13805618 to leasedSuppliers
16:07:44.421 [main] DEBUG io.rsocket.client.LoadBalancedRSocketMono - Creating WeightedSocket WeightedSocket(median=0.0 quantile-low=0.0 quantile-high=0.0 inter-arrival=1000000.0 duration/pending=0.0 pending=0 availability= 0.0)-> from factory io.rsocket.client.filter.RSocketSupplier@13805618
16:07:44.421 [main] DEBUG reactor.netty.resources.PooledConnectionProvider - Creating new client pool [tcp] for localhost:7002
16:07:44.421 [main] DEBUG io.rsocket.client.RSocketSupplierPool - Added io.rsocket.client.filter.RSocketSupplier@cb644e to leasedSuppliers
16:07:44.421 [main] DEBUG io.rsocket.client.LoadBalancedRSocketMono - Creating WeightedSocket WeightedSocket(median=0.0 quantile-low=0.0 quantile-high=0.0 inter-arrival=1000000.0 duration/pending=0.0 pending=0 availability= 0.0)-> from factory io.rsocket.client.filter.RSocketSupplier@cb644e
16:07:44.421 [main] DEBUG reactor.netty.resources.PooledConnectionProvider - Creating new client pool [tcp] for localhost:7001
Exception in thread "main" reactor.core.Exceptions$ReactiveException: io.rsocket.client.NoAvailableRSocketException
    at reactor.core.Exceptions.propagate(Exceptions.java:326)
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:91)
    at reactor.core.publisher.Flux.blockLast(Flux.java:2391)
    at com.example.rsocket.rsocketserver.LoadBalancerClient.main(LoadBalancerClient.java:42)
    Suppressed: java.lang.Exception: #block terminated with an error
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
        ... 2 more
Caused by: io.rsocket.client.NoAvailableRSocketException
    at io.rsocket.client.NoAvailableRSocketException.<init>(Unknown Source)
Josh Fix
@joshfix
what does your accepter look like? i was able to run your code fine with the following acceptor configured:
        int[] PORTS = new int[]{7001};

        TcpServerTransport tcp = TcpServerTransport.create("0.0.0.0", PORTS[0]);
        RSocketFactory.receive()
                .acceptor((connectionSetupPayload, rSocket) -> Mono.just(
                        new AbstractRSocket() {
                            @Override
                            public Mono<Payload> requestResponse(Payload payload) {
                                log.debug("Incoming RSocket request with payload: " + payload.getDataUtf8());
                                return Mono.just(DefaultPayload.create("done"));
                            }
                        }))
                .transport(tcp)
                .start().log()
                .subscribe();
Karthikeyan Palanivelu
@pckeyan

@joshfix

Hi @joshfix ,

One thing I found is, If I block() on the below line, it all works. But I want it as non-blocking: Please advise.

List<RSocketSupplier> rsocketSuppliers = Arrays.stream(PORTS)
                        .mapToObj(port -> new RSocketSupplier(() -> **Mono.just**(RSocketFactory.connect()

                                .transport(TcpClientTransport.create("localhost", port))
                                .start().doOnSubscribe(s -> System.out.println("RSocket connection established."+ s))**.block()**)))
                        .collect(Collectors.toList());

Below is my Acceptor, simple as yours:

static final String HOST = "localhost";

    static Logger log = Logger.getLogger(MultipleServers.class.getName());

    static final int[] PORTS = new int[] {9000, 9001, 9002};
public static void main(String[] args) throws InterruptedException {

        Arrays.stream(PORTS)
                .forEach(port -> RSocketFactory.receive()
                        .acceptor(new SimpleSocketAcceptor("SERVER-" + port))
                        .transport(TcpServerTransport.create(HOST, port))
                        .start()
                        .subscribe());

        log.info("Servers running");

        Thread.currentThread().join();
    }

    static class SimpleSocketAcceptor implements SocketAcceptor {

        private String serverName;

        SimpleSocketAcceptor(String serverName) {
            this.serverName = serverName;
        }

        @Override
        public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
            log.info("Received setup connection on acceptor: [{}]" + serverName);
            return Mono.just(new AbstractRSocket() {
                @Override
                public Mono<Payload> requestResponse(Payload payload) {
                    log.info("Received 'request response' request with payload: [{}] on server [{}]" +
                            payload.getDataUtf8() + serverName);
                    return Mono.just(DefaultPayload.create("test-response"));
                }
            });
        }
    }
Karthikeyan Palanivelu
@pckeyan
Team, Please let me know if anyone has tried registering a service dynamically when it comes up for the above example? I am trying to develop a Service Discovery and Client Load Balancing for my microservices. Please advise.
Karthikeyan Palanivelu
@pckeyan
@joshfix Can you please confirm whether you used block() in the sample you tested my code? If not can you please share your code snippet for my reference to understand what I am doing wrong?
Josh Fix
@joshfix
here is an example of how i create a load balanced rsocket:
        socketMono = LoadBalancedRSocketMono.create(Flux.just(Collections.singleton(new RSocketSupplier(() -> RSocketFactory
                .connect()
                .transport(TcpClientTransport.create(configProps.getHost(), configProps.getPort()))
                .start()
                .doOnSuccess(s -> log.info("RSocket connection established.", s)))))
        );
Karthikeyan Palanivelu
@pckeyan
Thank You @joshfix, unless I call block(), my call to backend was not working. Code looks similar, Can you please help here?
When I ran your code, I got the below exception which will go away after I use block():
10:42:39.853 [main] INFO reactor.Flux.FlatMap.1 - cancel()
Exception in thread "main" reactor.core.Exceptions$ReactiveException: io.rsocket.client.NoAvailableRSocketException
    at reactor.core.Exceptions.propagate(Exceptions.java:326)
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:91)
    at reactor.core.publisher.Flux.blockLast(Flux.java:2391)
    at com.card.theia.server.LoadBalancerClient.main(LoadBalancerClient.java:79)
    Suppressed: java.lang.Exception: #block terminated with an error
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
        ... 2 more
Caused by: io.rsocket.client.NoAvailableRSocketException
    at io.rsocket.client.NoAvailableRSocketException.<init>(Unknown Source)
Karthikeyan Palanivelu
@pckeyan
Hi Team, Is there any recommendation how we can secure RSocket Communications?
Karthikeyan Palanivelu
@pckeyan

Team, When I send a Serializable Object from Client to Server; I am getting the following exception. I am using the org.apache.commons.lang3.SerializationUtils serialize/deserialize. Customer is a simple pojo implements serializable. Please help me here. Thanks in advance.

Client Send:

        RSocket source = RSocketFactory.connect()
                .transport(TcpClientTransport.create("localhost", 9000))
                .start()
                .block();
        source
                .requestResponse(DefaultPayload.create(SerializationUtils.serialize(new Customer("John", "Doe", 23))))
                .subscribe(response -> log.info("Response Received --->" + response.getDataUtf8()));

Server Read:

Customer customer = SerializationUtils.deserialize(payload.getData().array());

Exception thrown at above line; I checked and getting java.nio.DirectByteBuffer object:

java.lang.UnsupportedOperationException
    at java.nio.ByteBuffer.array(ByteBuffer.java:994)

Let me know what am I missing here.

Josh Fix
@joshfix
Not all ByteBuffers are backed by a byte array that you can access with the array method
Dominik Wojciechowski
@wojciechowskid_gitlab

Hey All! I've got a strange problem with generated proto files, I've been using proto for grpc before, but I had a bad experience with it and I wanted to try out rsocket. On small app, everything worked ok, but when I tried to incorporate it into Intellij-IDEA plugin, it stopped working. The error is C:\...\main\java\pl\dwojciechowski\proto\Service.java:46: error: cannot find symbol UnusedPrivateParameter unused) { ^ symbol: class UnusedPrivateParameter location: class Chunk but in IDE there is no issue found in generated files. Here is the simpliest proto file i could figure out, it is also not compiled successfully :

syntax = "proto3"; package pl.dwojciechowski.proto; message Chunk { bytes Content = 1; }

and here is my build.gradle.kts config : https://pastebin.com/CuMrgmjr

I really have no idea what may be wrong here.

cyberquarks
@cyberquarks
I've been using Websocket for video streaming and my major problem is backpressure, is RSocket also a good solution to deliver real-time video?
Rochdi Makhlouf
@rmakhlouf.oxyeo_gitlab
Hi there,
Did you know if there is a way to wrap some shared exception into "ApplicationErrorException" between two microservices that communicates with rSocket ?
Oleh Dokuka
@OlegDokuka
@all this room is archived in favor of https://gitter.im/rsocket/rsocket-java