Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
    Oleh Dokuka
    @OlegDokuka
    If you understand Ukrainian or Russian - I can even share a talk related to the source code :D
    dnijssen
    @dnijssen
    Haha thanks for the offer. But i'll pass on that :) don't really speak or understand either of them :)
    Oleh Dokuka
    @OlegDokuka
    Lol, np at all)
    Mohammad Mahdi Amini
    @bmd007
    This message was deleted

    Hi
    recently I've been thinking about what if we rewrite the whole kafka, kafkastream stack using rsocket and reactive programming? Any comments or ideas?

    We already have reactive kafka.
    How has been feedbacks on it?

    Anders Clausen
    @AndersClausen
    @OlegDokuka @spencergibb when we're using backpressure and specify a max size for buffer, is there any way to signal to the producer to delay publishing new messages until the consumer is ready (instead of throwing an exception and cancelling the stream)?
    routingRSocketRequester.route("responder-rr")
                   .data("Anders")
                   .retrieveFlux(String.class)
                   .onBackpressureBuffer(5)
                   .publishOn(Schedulers.boundedElastic())
                   .subscribe(message -> {
                      try {
                         System.out.println(message);
                      } catch (Exception ex) {
                         ex.printStackTrace();
                      }
                   });
    Oleh Dokuka
    @OlegDokuka
    @AndersClausen In that scenarion, the backpressure is driven by publishOn
    so if you wanna get no more than 5 elements I would recomend to use the prefetch parameter on the publishOn as in the following example
    routingRSocketRequester.route("responder-rr")
                   .data("Anders")
                   .retrieveFlux(String.class)
                   .publishOn(Schedulers.boundedElastic(), 5)
                   .subscribe(message -> {
                      try {
                         System.out.println(message);
                      } catch (Exception ex) {
                         ex.printStackTrace();
                      }
                   });
    In that case, 5 specify the buffer size inside the publishOn operator and since publishOn has to make sure the produce will not send more than the buffer size, it will request the appropriate amout when it is needed to fullfill the buffer
    1 reply
    Mohammad Mahdi Amini
    @bmd007
    May I ask what is the latest status on completion RSocket Routing and Forwarding extension of rsocket-java?
    Spencer Gibb
    @spencergibb
    @bmd007 it hasn't changed much in a while
    RupWeb
    @rupweb
    @OlegDokuka thanks a lot for the response as now the net client websocket connects to the java server
    the only thing now is the other question -> the route !!
    the code i am using I am getting
    2021-02-12T09:51:44,503 ERROR [reactor-http-nio-3] io.rsocket.core.RSocketRequester 208 handleIncomingFrames: Unexpected error during frame handling
    java.lang.IllegalStateException: Client received message for non-existent stream: 168298506, frame type: RESERVED
    from the server
    and
    image.png
    0001 Error {000}: [00000201] No handler for destination ''
    at the client
    so on the server we have
        @MessageMapping("quotes")
        Flux<Object> quotes(@Payload String message) {            
            return priceService.generatePrices(message);
        }
    on the client
    RupWeb
    @rupweb
        var stream = client.RequestStream(
            resultmapper: result => (Data: PdxNet.Deserialize<QuoteClient>(result.data), Metadata: PdxNet.Deserialize<QuoteClient>(result.metadata)),
            data: PdxNet.Serialize(quoteRequest), metadata: PdxNet.Serialize(quoteRequest));
    sorry for the n00b route questions on net client ! ;0
    RupWeb
    @rupweb
    hang on i think we've got it now
    RupWeb
    @rupweb
    nope... how can I route the client.RequestStream binary call to the quotes endpoint please?
    RupWeb
    @rupweb
    After setting up the binary stream with C# [Serializable] objects (as above resultmapper and data) then I am using the stream.ForEachAsync
    await stream.ForEachAsync(quotes => Console.WriteLine($"RawDemo.OnNext===>[{quotes.Metadata}]{quotes.Data}"));
    Oleh Dokuka
    @OlegDokuka
    @rupweb you have to encode route in a proper way
    in rsocket-.net we do not have composite metadata support as of this moment
    thus, you would need to do that yourselves or you would need to encode your route as a plain string
    I have no idea how PdxNet works, but it is defenitelly will not be handled by spring
    also, rsocket-routing is not the right place, better to ask at https://gitter.im/rsocket/rsocket-dotnet for the dotnet impl
    RupWeb
    @rupweb
    thanks.... i got a string implementation going using
        byte[] intBytes = BitConverter.GetBytes(6);
        string stringBytes = Encoding.Default.GetString(intBytes, 0, 1);
        metaData = stringBytes + "quotes";
        var stringclient = new RSocketClient.ForStrings(client);    //A simple client that uses UTF8 strings instead of bytes.
        await stringclient.RequestStream(json, metaData)
            .ForEachAsync((result) =>
            {
                Console.WriteLine($"Result ===> {result}");
             });
    just want to get the binary impl going
    Oleh Dokuka
    @OlegDokuka
    var client = new RSocketClient(new WebSocketTransport("ws://localhost:8080/"));
                await client.ConnectAsync(new RSocketOptions()
                {
                    MetadataMimeType = "message/x.rsocket.routing.v0",
                    DataMimeType = "application/octet-stream"
                });
    
    
                Console.WriteLine("Requesting Raw Protobuf Stream...");
    
                var route = new ReadOnlySequence<byte>(new byte[]
                {
                    (byte) Encoding.UTF8.GetByteCount("request.stream")
                }.Concat(Encoding.UTF8.GetBytes("request.stream")).ToArray());
    
                //Make a Raw binary call just to show how it's done.
                var stream = client.RequestStream(
                    resultmapper: result => (Data: Encoding.UTF8.GetString(result.data.ToArray()), Metadata: Encoding.UTF8.GetString(result.metadata.ToArray())),
                    data: new ReadOnlySequence<byte>(Encoding.UTF8.GetBytes("test")), metadata: route);
    
                await stream.ForEachAsync(persons => Console.WriteLine($"RawDemo.OnNext===>[{persons.Metadata}]{persons.Data}"));
    that what works for me
    RupWeb
    @rupweb
    Thanks :) got the route working
    Oleh Dokuka
    @OlegDokuka
    @rupweb as I mentioned before, this gitter channel is for a different project, can we move our future conversations to https://gitter.im/rsocket/rsocket-dotnet
    also, what you mentioned looks like a bug
    thus better to create an issue at the https://github.com/rsocket/rsocket-net/issues/new/choose
    Bau Nguyen Van
    @baudcctak54_gitlab
    hi Oleh. Can i implement routing client in javascript?
    2 replies
    kevin
    @kevinat

    Hi Oleh, Can I use this dependency in my spring boot project?

      <dependency>
          <groupId>io.rsocket.routing</groupId>
          <artifactId>rsocket-routing-broker-spring</artifactId>
          <version>0.3.0-SNAPSHOT</version>
        </dependency>

    I configured spring snapshot repo in my pom, but got "Not authorized" when compile.

    5 replies
    kevin
    @kevinat

    Hi Oleh,

    I learned from the source code that routing broker uses a high-performance query mechanism based on tags when locating a route.
    May I ask if a client submits a tag map containing hundreds of entries, will it influence the performance?

    pedrorlmarques
    @pedrorlmarques
    Hello, is there any plan to have a rsocket-routing-client for JS?