by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • 08:51
    violetagg milestoned #539
  • 08:51
    violetagg demilestoned #539
  • 08:50
    violetagg demilestoned #622
  • 08:50
    violetagg milestoned #622
  • 08:45
    violetagg labeled #1221
  • 08:45
    violetagg unlabeled #1221
  • 08:45
    violetagg closed #1221
  • 08:45
    violetagg commented #1221
  • 08:44
    violetagg unlabeled #1220
  • 08:44
    violetagg labeled #1220
  • 08:44
    violetagg unlabeled #1220
  • 08:44
    violetagg closed #1220
  • 08:44
    violetagg commented #1220
  • 08:42
    violetagg assigned #1054
  • 08:42
    violetagg milestoned #1054
  • 08:42
    violetagg demilestoned #1054
  • 07:59
    violetagg commented #1054
  • 07:47
    violetagg closed #1249
  • 07:47
    simonbasle edited #1249
  • 07:41
    codecov-commenter commented #1249
Aniket Panchariya
@aniketpanchariya

Hello all @violetagg ,
I have a strange behavior in which I have many CLOSE WAIT connections never closed by netty. These connections are with my downstream (incoming connections) and not upstream (outgoing connections).

Expected behavior:
CLOSE_WAIT connections should be closed

Actual Behavior:
CLOSE_WAIT connections dont appear initially, after a major GC happens CLOSE_WAIT connections count is continuously increasing and eventually failing k8s probe and restarting the pod.

Some other information:
Netstat summary:
netstat | awk '{print $NF;}' | sort | uniq -c
1 661131249
1 661145188
1 661146358
1 661158260
83 CLOSE_WAIT
1053 ESTABLISHED
1 Path
1 State
86 TIME_WAIT

Reactor Netty version: 0.9.7
reactor-core:3.3.0.RELEASE
org.springframework:spring-webflux:5.2.6.RELEASE

violetagg
@violetagg
@aniketpanchariya And do you see the same behaviour with the latest releases?
reactor netty 0.9.9, reactor core 3.3.7 and spring framework 5.2.7
Aniket Panchariya
@aniketpanchariya
@violetagg I havent tried with that, let me try and share the result. Quick question, i have configured webclient(keepalive, readtimeout, writetimeout handler etc) for outbound traffic but nothing done from inbound connections. Any handlers needs to be implemented for the same ?
violetagg
@violetagg
these all are related to timeouts, do you observe such?
Aniket Panchariya
@aniketpanchariya
till 1st major GC happens everything is smooth, during major GC the client gets some timeouts.
33 replies
Aniket Panchariya
@aniketpanchariya

@violetagg
One more thing i observed is,
a. I enabled ConnectionProvider metrics and the total connections and active connections keeps on increasing till maxConnection limit is hit.

Screen Shot 2020-07-12 at 0.48.55.png

b. After that I got an NettyException

[8c3e8ad0] Error [reactor.netty.ReactorNetty$InternalNettyException: java.nio.channels.ClosedChannelException] for HTTP POST "/****/v1", but ServerHttpResponse already committed (207 MULTI_STATUS)

My connectionprovider config. (Not sure if maxLifetime and maxIdle time are working correctly)

ConnectionProvider.builder(CONNECTION_POOL_NAME).maxConnections(maxConnection)
                    .pendingAcquireTimeout(Duration.ofMillis(ConnectionProvider.DEFAULT_POOL_ACQUIRE_TIMEOUT))
                    .metrics(true)
                    .maxLifeTime(Duration.ofMinutes(maxLifeTime))
                    .maxIdleTime(Duration.ofSeconds(maxConnectionIdleTime))
                    .build();
violetagg
@violetagg
and the webclient configuration?
@aniketpanchariya ^^^
Aniket Panchariya
@aniketpanchariya
    /**
     * Create the reactive netty WebClient
     *
     * @param connectionMap connection map
     * @return The WebClient to perform reactive web request
     */
    WebClient getWebClient(Map<String, String> connectionMap) {
        return webClientBuilder.clientConnector(new ReactorClientHttpConnector(getHttpClient(connectionMap))).build();
    }


    /**
     * Prepare Reactor-Netty HttpClient with Http-Proxy-Settings, Read-Timeout and Write-Timeout.
     *
     * @param connectionMap connection map
     * @return The http client
     */
    private HttpClient getHttpClient(Map<String, String> connectionMap) {
        return HttpClient.create(connectionProvider).tcpConfiguration(getTcpMapper(connectionMap));
    }

    /**
     * Initialize tcp client with it's basic ChannelOption settings.
     * SO_KEEPALIVE         If true then the channel try to keep alive when server allows similar functionality
     * TCP_NODELAY          improved latency and throughput on a certain workload. Nagle's algorithm was introduced
     * to reduce the overhead of TCP/IP packets (i.e. smaller a packet, bigger the overhead).
     * It is basically to prevent a poor user application from generating too many small packets.
     * AUTO_CLOSE           If true then the Channel is closed automatically and immediately on write failure.
     *
     * @param tcpClient     the targeted tcp client
     * @param connectionMap the key-value pair that keeps all the http connection configuration.
     * @return the tcp client
     */
    protected TcpClient initTcpClient(TcpClient tcpClient, Map<String, String> connectionMap) {
        boolean keepAlive=env.getProperty(KEEP_ALIVE_KEY, Boolean.class, SO_KEEP_ALIVE_STATUS);
        return tcpClient
                .option(ChannelOption.SO_KEEPALIVE, keepAlive)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
                        Integer.valueOf(connectionMap.get(TIMEOUT)))
                .doOnConnected(conn -> {
                        long readWriteTimeout=Long.parseLong(connectionMap.get(SO_TIMEOUT));
                        conn.addHandlerLast(
                        new ReadTimeoutHandler(readWriteTimeout, TimeUnit.MILLISECONDS)).addHandlerLast(new WriteTimeoutHandler(readWriteTimeout, TimeUnit.MILLISECONDS));
                });
    }

    protected Function<TcpClient, TcpClient> getTcpMapper(Map<String, String> connectionMap) {
        return (TcpClient tcpClient) -> {
            tcpClient = initTcpClient(tcpClient, connectionMap);
            return tcpClient;
        };
    }
@violetagg
36 replies
JakubJecminek
@JakubJecminek
hi all,
i have question about graceful shutdown. It seems to have problem with persistent http connections as when shutdown is initiated, new requests are still incoming through already opened http connections. Is there some way how to configure closing of existing persistent http connections? Does current (0.9.9) implementation of reactor-netty even handles persistent connections?
1 reply
Aniket Panchariya
@aniketpanchariya
Screen Shot 2020-07-15 at 23.36.00.png
Chinthaka Dharmasiri
@chinthakadd
Has anyone tried configuring Http/2 with WebClient?
reactor/reactor-netty#639
Reading thru this, it seems seems like the support is enabled recently in reactor-netty. But is there a compatible web flux version that you can recommend? @violetagg
Valentin Iliev
@ilievvalentin
How can you see wich TCP cleint is sending you a message and send a message back to the specific client. In spring tcp integration it was easy with the connectionId header.
14 replies
Peter Jurkovic
@peterjurkovic
Hi there, it is possible to enable followRedirects without stripping Authorization header? (is there any config option) ?
16 replies
Alan Zimmer
@alzimmermsft

Hi, I'm trying to add a custom ProxyHandler to handle Digest authentication along with Basic authentication. It appears that my current configuration is causing a new TCP connection and SSL session on every request. This is how I'm currently adding it into the pipeline:

if (proxyHandler != null) {
                /*
                 * Configure the request Channel to be initialized with a ProxyHandler. The ProxyHandler is the first
                 * operation in the pipeline as it needs to handle sending a CONNECT request to the proxy before any
                 * request data is sent.
                 */
                tcpClient = tcpClient.bootstrap(bootstrap -> BootstrapHandlers
                    .updateConfiguration(bootstrap, NettyPipeline.ProxyHandler, (connectionObserver, channel) ->
                        channel.pipeline().addFirst(NettyPipeline.ProxyHandler, proxyHandler)));

            }

Is there a different way to add this where it won't trigger a new TCP connection and SSL session each time?

violetagg
@violetagg
@alzimmermsft I replied here reactor/reactor-netty#1213
Tell me if this is not what you need
Ismail Marmoush
@IsmailMarmoush

Hello,
How do you set the http status code as in response.status(200) according to result of HttpServerRequest ? , I couldn't find any examples of doing so, maybe I missed it.

Anyway, the examples provided on github and documents, shows

HttpServer.create() .port(0) .route(routes ->    routes.post("/test/{param}", (request, response) ->
                          response.sendString(request.receive()
                                                     .asString()
...

But in real life example I'd use that string to call DB and return result which in turn could be a string or let's say an error, so now I can't control the response.status(200).sendString( requestFlux) because as you see the request flux is inside sendString.

I used to reverse it, and use response inside the request flux, and do a then() call, but I'm not sure if that's the right way.

Thanks in advance!

violetagg
@violetagg
yes you can do that if you depend on the request
Ismail Marmoush
@IsmailMarmoush
you mean reversing and using then() ?
violetagg
@violetagg
yes
Ismail Marmoush
@IsmailMarmoush
alright great, thanks a lot !
yangbongsoo
@yangbongsoo
@violetagg Hello I shared with you that there were many Major GCs in the past. Heap Memory was 1G.
Do you think we should increase the Heap Memory while doing the version-up?
The question is wrong. Is there a possibility that the Heap Memory usage will increase during the version-up?
7 replies
rudolfv
@rudolfv
@violetagg We are replacing an Apache Camel based API gateway with Spring Cloud Gateway and are encountering "Connection reset" errors that we are not able to reliably reproduce and didn't see with Apache Camel. The clients use Apache HTTP client pools and I have configured an idle connection eviction timeout of 60 seconds which seems to have solved the problem, though I can't say for sure. Can you please provide some insight into why this would happen with Spring Cloud Gateway and not Camel in terms of the differences in the way server side connections are handled?
39 replies
eighty4
@eighty4
I'm getting a very big memory leak with each reactor-netty request I make. I've used bare reactor-netty on a few projects, but have never any leaks or services that stop responding to clients. The app stops responding to requests after 5-6 requests after boot and the memory footprint of my app goes from cold start 60mb to 500mb after a few requests. There's no log output or errors and I couldn't get much out of my first heap dump dive. I also haven't been able to properly configure log levels for reactor-netty using my log4j2.xml (like a <Logger name="reactor.netty" level="DEBUG"/> logger). Anyone here have experience with this?
1 reply
Fabian Beier-Trampusch
@trampi
Hi! I am using reactor-netty (without spring) to write a small UDP server for hobby purposes in the gaming area. I am somewhat familiar with the thinking behind reactive streams (rxjs) but not yet very familiar with project reactor. I want to have that UdpEventReceiver-class that allows access to a Flux<Event>. Event is something I received via UDP and mapped to something more useful for general consumers in my application. How can I "publish" the results of the mapping of a UDP server?
class UdpEventReceiver {

  private final Connection connection;
  Flux<Event> events; // <-- how do I get the events from the last handler mapping here?

  UdpEventReceiver(...) {
            this.connection = UdpServer.create()
            .host("localhost")
            .port(14502)
            .doOnBind(c -> System.out.println("on bind " + c))
            .doOnBound(x -> System.out.println("do on bound" + x))
            .doOnUnbound(x -> System.out.println("do on unbound " + x))
            .handle((in, out) -> in.receive()
                    .flatMap(this::parseJson)
                    .window(Duration.ofSeconds(10))
                    .flatMap(this::groupByKiller) // just a computer game, please do not be afraid ;-)
                    .then() // <-- so I would ike to publish the results up to here in this.events
            )
            .bindNow(Duration.ofMillis(500));
  }

}
5 replies
Bradley L Schatz
@blschatz
Hi, I'm trying to use the new Unix Domain Socket transport in 1.0.0.M1, but cant seem manage it. My test:
  DisposableServer ts = TcpServer.create()
          .bindAddress(() -> new DomainSocketAddress("/Users/bradley/test.sock"))
          .bindNow();
Caused by: java.lang.IllegalArgumentException: Unsupported channel type: ServerDomainSocketChannel
at reactor.netty.resources.DefaultLoopNIO.getChannel(DefaultLoopNIO.java:50)
at reactor.netty.resources.LoopResources.onChannel(LoopResources.java:206)
at reactor.netty.tcp.TcpResources.onChannel(TcpResources.java:185)
at reactor.netty.transport.TransportConfig.lambda$connectionFactory$0(TransportConfig.java:263)
at reactor.netty.transport.TransportConnector.doInitAndRegister(TransportConnector.java:168)
at reactor.netty.transport.TransportConnector.bind(TransportConnector.java:74)
at reactor.netty.transport.ServerTransport.lambda$bind$0(ServerTransport.java:110)
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
at reactor.core.publisher.Mono.subscribe(Mono.java:4219)
at reactor.core.publisher.Mono.block(Mono.java:1702)
at reactor.netty.transport.ServerTransport.bindNow(ServerTransport.java:144)
at reactor.netty.transport.ServerTransport.bindNow(ServerTransport.java:129)
at io.netifi.rsocket.example.service.Main.main(Main.java:25)
5 replies
Looks like I'm getting a NIO event loop, despite having the netty-transport-native-kqueue in my classpath. Any suggestions? Thanks!
jdoherty
@jdoherty
Hi folks - I am something of a newbie to webclient / netty / spring webflux. But what I am trying to achieve seems simple enough and for the life of me I cannot understand why it doesnt work. I basically want to observe request / response bodies and headers when I stick it on debug mode. I followed the tutorial on baeldung (https://www.baeldung.com/spring-log-webclient-calls) - and I can see the logRequest and logResponse kicking in and logging headers. But no matter what I try - my custom logger does not seem to get bootstrapped. I do not want the hex dump - this is what I want rid of (or even better a way to toggle on / off) - I saw on the netty issues a nice feature was implemented for this - ByteBufFormat.SIMPLE.
My webclient setup seems very standard
return WebClient.builder().
exchangeStrategies(ExchangeStrategies.builder()
.codecs(configurer -> {
configurer.defaultCodecs().maxInMemorySize(Integer.parseInt(webClientMaxInMemorySize));
configurer.defaultCodecs().enableLoggingRequestDetails(true);
})
.build())
.clientConnector(new ReactorClientHttpConnector(httpClient))
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_FORM_URLENCODED_VALUE)
.filter(logRequest())
.filter(logResponse())
.build();
HttpClient:
private HttpClient getHttpClient(){
return HttpClient
.create()
.tcpConfiguration(
tc -> tc.bootstrap(
b -> BootstrapHandlers.updateLogSupport(b, new CustomLogger(HttpClient.class)))
.wiretap(true));
}
custom logger
public CustomLogger(Class<?> clazz) {
super(clazz);
}
@Override
protected String format(ChannelHandlerContext ctx, String event, Object arg) {
    if (arg instanceof ByteBuf) {
        ByteBuf msg = (ByteBuf) arg;
        return decode(
                msg, msg.readerIndex(), msg.readableBytes(), StandardCharsets.UTF_8);
    }
    return super.format(ctx, event, arg);
}

private String decode(ByteBuf src, int readerIndex, int len, Charset charset) {
    if (len != 0) {
        byte[] array;
        int offset;
        if (src.hasArray()) {
            array = src.array();
            offset = src.arrayOffset() + readerIndex;
        } else {
            array = allocateUninitializedArray(max(len, 1024));
            offset = 0;
            src.getBytes(readerIndex, array, 0, len);
        }
        return new String(array, offset, len, charset);
    }
    return "";
}
but for some reason this logger never seems to get called - it seems something behind the scenes to do TCP client when calling HttpClient.create() is setting the ByteBufFormat to HEX.
Am I missing something fundamental / obvious? thanks for any help you can provide
29 replies
Matt Smith
@00-matt
Hi all :) Are there any larger examples demonstrating how to use the TcpServer? I had a look at the DiscardServer and EchoServer examples but I am a bit lost on the best way to do framing and decoding messages (like with just plain Netty I would use aDelimiterBasedFrameDecoder and create a MyProtocolCodec), and also how to store a bit of state for each connection. Thank you
7 replies
Matt Smith
@00-matt
Reading some of the past messages here - would a Processor replace a netty codec?
yangbongsoo
@yangbongsoo

@violetagg Hello. I have a question about timeout issue. I read reactor/reactor-netty#1159 .
in the issue, He said Tcp level timeout has problems. but I don't understand exactly below sentence.

"They apply even when an HTTP request is not being processed. For example, they could cause a connection sitting in the connection pool to be closed, even though it might be able to be used a split-second later by another request."

How could ReadTimeoutHandler/WriteTimeoutHandler apply when HTTP request is not being processed ??

37 replies
Alessandro Vermeulen
@spockz
I have been looking into how reactor-netty implements backressure for the http client. It appears as if it translates back pressure on the tcp stream to the http requests. Is this the case? Or does it (also?) work on response codes and headers?
4 replies
yangbongsoo
@yangbongsoo

@violetagg Hi I watched reactor/reactor-netty#1246 your pull request.
in scg, they have spring.cloud.gateway.httpclient.response-timeout property. When I set that propery,
they add timeout value in .timeout() operator on the reactive stream.

        Duration responseTimeout = getResponseTimeout(route);
        if (responseTimeout != null) {
            responseFlux = responseFlux
                    .timeout(responseTimeout, Mono.error(new TimeoutException(
                            "Response took longer than timeout: " + responseTimeout)))
                    .onErrorMap(TimeoutException.class,
                            th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT,
                                    th.getMessage(), th));
        }

I think It makes response timeout understand hard.

3 replies
Alif Bhaskoro
@xpanda17

hi @violetagg , im new on reactive programming
So I have a service which will call an external API. Im doing load testing currently, where I mocked the external APIs to return a response with delay ~200-300ms.
Upon doing load test with ~1k-2k users, I found that my code that call the external API finished in more than 300ms (usually 600-800ms, with max 2s)

I tried to call the mocked external API (using curl) when load testing is running. It returns in ~300ms [means the problem is not in the mock service)

My guess is my worker thread is super busy, I tried to increase the worker thread size by using -Dreactor.netty.ioWorkerCount=128, but it doesnt seem to help.
Can you enlight me about this problem? Currently idk what to do and where to start investigating because im new to reactive programming

Stack: spring-boot-starter-webflux which is build on top reactor-netty:0.9.7 and I'm using webclient btw

1 reply
Jonas Höglin
@jhoglin
Hello, I'm looking at the new feature in Spring 2.3.2 using webflux to gracefully shutdown. I'm surprised about the default behavior that if I simulate a long request for say 60sec, and set a timeout-per-shutdown-phase: 20s. The server on a Ctrl + c nicely starts a graceful shutdown. But since my request wont complete in the 20s, the graceful shutdown aborts and the server stays around to complete the request. Is this by design or can one change this behavior? I would expect the server to force kill any outstanding requests and shut down after the grace period.
10 replies
Scott White
@kibbled
is there an easy way to go from Java’s SSLContext to Netty’s SslContext?