Question about logging. Why is that in all the http2 pipelines we do not add the metricsRecorder (HttpClientMetricsRecorder) to the end of the pipeline. I see we do this for http1, but not for h2. For reference I don't see it being added here: https://github.com/reactor/reactor-netty/blob/baff0517529357a4e45b79d900114971519e1a68/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java#L506
note: I do see my custom metrics class being invoked for methods that inherit from ChannelMetricsRecorder, but not from HttpClientMetricsRecorder. Thanks!
Topic: Websockets - detecting client close (or more broadly, any close of the websocket).
The websocket reactive stream can be closed - which in turn closes websocket (WebsocketSubscriber.onComplete() calls WebsocketServerOperations.sendCloseNow() if the websocket is still active).
However when the client closes the websocket the reactive stream isn't closed (I would have assumed WebsocketSubscriber.onSubscribe() should store the subscription to a field, then call 'subscription.cancel()' when the client closes the websocket).
The issue I've got is I want to maintain a pool of active websockets to broadcast messages to and I need to be able to remove websockets from this pool when they disconnect. The API doesn't especially allow for this, what I would have assumed, was a very standard usecase.
Hi Team, we have an application with spring boot 2.6.4 and webflux. currently facing a java.lang.OutOfMemoryError: Direct buffer memory
Original Stack Trace:
Caused by: java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Unknown Source) ~[?:?]
at java.nio.DirectByteBuffer.<init>(Unknown Source) ~[?:?]
at java.nio.ByteBuffer.allocateDirect(Unknown Source) ~[?:?]
at io.netty.buffer.UnpooledDirectByteBuf.allocateDirect(UnpooledDirectByteBuf.java:104) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
at io.netty.buffer.UnpooledDirectByteBuf.capacity(UnpooledDirectByteBuf.java:156) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
at io.netty.buffer.AbstractByteBuf.ensureWritable0(AbstractByteBuf.java:305) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:280) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1103) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
at io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:99) ~[netty-codec-4.1.74.Final.jar:4.1.74.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:277) ~[netty-codec-4.1.74.Final.jar:4.1.74.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.74.Final.jar:4.1.74.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.74.Final.jar:4.1.74.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.74.Final.jar:4.1.74.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [netty-transport-4.1.74.Final.jar:4.1.74.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.74.Final.jar:4.1.74.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.74.Final.jar:4.1.74.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [netty-transport-4.1.74.Final.jar:4.1.74.Final]
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) [netty-transport-classes-epoll-4.1.74.Final.jar:4.1.74.Final]
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) [netty-transport-classes-epoll-4.1.74.Final.jar:4.1.74.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) [netty-transport-classes-epoll-4.1.74.Final.jar:4.1.74.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) [netty-common-4.1.74.Final.jar:4.1.74.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.74.Final.jar:4.1.74.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.74.Final.jar:4.1.74.Final]
at java.lang.Thread.run(Unknown Source)
I have tried -Dio.netty.leakDetection.level=paranoid
but didn't give any leak details. can you please guide or help me to investigate
Currently im investigating a memory leak with netty and Spring webflux
The logs doesnt contain the connection id so that i can know which connection has the memry leak
I’ve the following enabled : io.netty.leakDetection.targetRecords=40, -Dio.netty.leakDetection.level=paranoid, logging.level.reactor.netty.channel.FluxReceive=debug
but this is the log i get:
LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
#1:
Hint: 'reactor.right.reactiveBridge' will handle the message from this point.
io.netty.handler.codec.http.DefaultHttpContent.touch(DefaultHttpContent.java:86)
I expect something with the connection id like this:
LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
#1:
Hint: [id: 0x0f9d0570, L:/10.36.23.97:48846 - R:www.googleapis.com/108.177.111.95:443] Buffered ByteBufHolder in Inbound Flux Queue
io.netty.handler.codec.http.DefaultHttpContent.touch(DefaultHttpContent.java:86)
any ideas if im doing something wrong so that i can get the connection id?
Trying to pin down an issue causing an intermittent
io.netty.handler.codec.EncoderException: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
The only place in the entire project where I deal with bytebufs is in this method (httpRequest is a ResponseReceiver):
public Mono<String> getRequest() {
return httpRequest.responseSingle(((response, byteBufMono) -> {
if(response.status().code() / 100 == 2) {
return byteBufMono.asString(StandardCharsets.UTF_8);
}
else {
return byteBufMono.asString(StandardCharsets.UTF_8)
.switchIfEmpty(Mono.just(""))
.flatMap(data -> Mono.error(FailureStrategies.makeWebException(response, data)));
}
}));
The only thing I could possibly think of is that I call getRequest() multiple times with the same ResponseReciever if I get a 500 or whatever, but that retry system works most of the time so idk why that would be the issue.
Hi All - I saw that reactor-netty-1.0.15 introduced two server metrics that I'd like to expose in a spring-boot-webflux application (reactor.netty.http.server.connections.active
and reactor.netty.http.server.connections.total
. I'd like to avoid turning on the other reactor.netty.http.server.*
metrics because a number of them are tagged with URIs which will generate high cardinality in my metrics, and I already have spring-boot generating metrics that include templated URIs for me.
Is it possible to selectively register specific metrics that will be exposed? To avoid the high cardinality, I registered a NettyServerCustomizer
bean that registers all the metrics with a URI tag as ""
:
public class NettyServerMetricsCustomizer implements NettyServerCustomizer {
@Override
public HttpServer apply(HttpServer httpServer) {
return httpServer.metrics(true, s -> "");
}
}
But this seems less than ideal. I appreciate any help/suggestions!
Hi,
I'm trying to figure out the cause of an issue which is also present in stackoveflow. This is what I get in my logs:
"2022-03-22 11:53:38.061 DEBUG 1 --- [ parallel-1] o.s.w.r.f.client.ExchangeFunctions : [6fc37ed8] HTTP POST https://myserver.info/oauth/v2/token\n",
"2022-03-22 11:53:48.048 DEBUG 1 --- [or-http-epoll-4] o.s.w.r.f.client.ExchangeFunctions : [6fc37ed8] Cancel signal (to close connection)\n"
I have not set any timeout configuration, so it seems that some kind of timeout occurs (always after 10 seconds). There aren't exceptions in the logs, it just seems that the connection is closed. I tried to reproduce this behavior simulating timeout or connection closing on the server, but in such cases I always get an error and an exception in the logs.
Have you any idea about what could happen to the connection or how can I reproduce this error?
Thank you very much.
ContextAwareHttpMetricsRecorder
methods? What I am trying to do is to add TLS protocol and cipher names as dimensions to TLS handshake metrics in ContextAwareHttpMetricsRecorder
. It has access to ContextView
but, not sure how I can put TLS related stuff to it. SslHandler in Netty pipeline has this details, but not sure how to access to it.
Hi Team, with spring boot 2.6.4 trying to print webclient response in logs (both success or failure cases) and trying below option.
var httpClient = HttpClient.create(connectionProvider)
.wiretap(getClass().getSimpleName() + " Request/Response : ", LogLevel.INFO, AdvancedByteBufFormat.TEXTUAL);
is it fine to have above code on production or does it has performance impact? please
@violetagg
I’m using reactor-netty httpclient with a load balancer. The LB has a default connection time out of 180s. I want to simulate a "connection reset by peer error", reset by the LB specifically. I assume if I set the client with
maxIdleTime(Duration.ofSeconds(360))
.maxLifeTime(Duration.ofSeconds(360))
it should reproduce this error a lot, but I can hardly see any. I got around 1 or 2 over 24 hours, I not even sure this was related to the settings.
What kind of setting can help me reproduce this error? and how does the maxIdleTime affect if the client exceeds the configuration?
ReactorNettyWebSocketClient - Connecting to ws://192.168.0.101:11024/websocket/ws?accessToken=456
2022-04-10 21:43:06,408 [reactor-http-nio-3] ERROR o.s.w.s.a.HttpWebHandlerAdapter - [61cd3240-9] Error [java.lang.UnsupportedOperationException] for HTTP GET "/api/websocket/ws?accessToken=456", but ServerHttpResponse already committed (200 OK)
spring-boot : 2.3.12 RELEASE
reactor-netty : 0.9.20 RELEASE
ConnectionProvider provider = ConnectionProvider.builder("custom")
.maxConnections(16)
.maxIdleTime(Duration.ofSeconds(100))
.maxLifeTime(Duration.ofSeconds(100))
.build();
HttpClient httpClient = HttpClient.create(provider)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.responseTimeout(Duration.ofMillis(5000))
.doOnConnected(conn ->
conn.addHandlerLast(new ReadTimeoutHandler(5000, TimeUnit.MILLISECONDS))
.addHandlerLast(new WriteTimeoutHandler(5000, TimeUnit.MILLISECONDS)))
.wiretap("reactor.netty.http.client.HttpClient",
LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL);
httpClient.warmup()
.block();
while (true) {
httpClient.post()
.uri("https://httpbin.org/post")
.send(ByteBufFlux.fromString(Mono.just("hello")))
.response()
.block();
try {
Thread.sleep(10_000);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
Hi
Im trying to do some benchmarking with spring boot using http1 and http2
and i see i weird behaviour.
SPRING BOOT 2.6.6
http2 is bit slower than http1 looks like is not taking advantage of single tcp connection with multiplexing.
My Server:
I have enabled http2 in properties :
server.http2.enabled= true
and in using curl
curl -I --http2 http://localhost:8080/sms-request/test
i get:
HTTP/1.1 101 Switching Protocols
connection: upgrade
upgrade: h2c
HTTP/2 200
content-type: text/plain;charset=UTF-8
content-length: 10
My client is another spring boot 2.6.6
and looks like
WebClient webClient = WebClient.create().mutate()
.clientConnector(
new ReactorClientHttpConnector(
HttpClient.create().protocol(HttpProtocol.H2C)))
.baseUrl("http://localhost:8080/sms-request/test").build();
Flux.range(1, 100000000)
.concatMap(integer -> webClient.get().retrieve().bodyToMono(String.class))
.subscribe()
;
Using default protocol i can process up to 23,959 requests per second
with H2C protocol i can barely reach 16.000request per second
Hi,
I have a spring cloud gateway application that has a filter that does a post to a third party service. It is working when i send a body
as Mono.empty() but is failing when i send a simple json request body .
spring cloud gateway version : 2.2.6.RELEASE
"errorMessage": "java.lang.IllegalStateException: unexpected message type: PooledUnsafeDirectByteBuf"
"exception":"io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: unexpected message type: PooledUnsafeDirectByteBuf
io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:104)
io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:346)
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
reactor.netty.channel.MonoSendMany$SendManyInner.run(MonoSendMany.java:317)
reactor.netty.channel.MonoSendMany$SendManyInner.trySchedule(MonoSendMany.java:419)
reactor.netty.channel.MonoSendMany$SendManyInner.onNext(MonoSendMany.java:219) org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:90) reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:90) reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:96)
reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2344)
reactor.core.publisher.FluxContextStart$ContextStartSubscriber.request(FluxContextStart.java:125) org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:76) reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162) org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:76) reactor.netty.channel.MonoSendMany$SendManyInner.onSubscribe(MonoSendMany.java:250) org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:69) reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:69) reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onSubscribe(FluxContextStart.java:90)\n\tat
reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)
reactor.core.publisher.Flux.subscribe(Flux.java:8325)
reactor.netty.channel.MonoSendMany.subscribe(MonoSendMany.java:102)
reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:153)
reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56)
reactor.core.publisher.Mono.subscribe(Mono.java:4213)
reactor.netty.NettyOutbound.subscribe(NettyOutbound.java:331)
reactor.core.publisher.MonoSource.subscribe(MonoSource.java:65)
reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
this is the truncated exception message i am seeing. Can some one please help me in figuring out why this could be happening.
Thanks
I have a bit of code (below) that depends on reactor-netty:0.9.20.RELEASE
but no longer compiles after upgrading to 1.0.17
. I saw a reference to this snippet as a potential fix in an earlier thread but, as a casual maintainer, it's not obvious to me if that covers (2) below (i.e. setting the channel Class).
Any help is greatly appreciated. Thanks in advance.
fun webServerFactory(): NettyReactiveWebServerFactory {
val eventLoopGroup = /* omitted for brevity */
val serverChannelClass = /* omitted for brevity */
val eventLoopCustomizer = NettyServerCustomizer { server ->
server.tcpConfiguration { tcpServer ->
tcpServer.bootstrap { serverBootstrap ->
serverBootstrap
.group(eventLoopGroup) // <-- (1) custom EventLoopGroup
.channel(serverChannelClass) // <-- (2) custom channel Class
}
}
}
return NettyReactiveWebServerFactory().also { factory ->
factory.serverCustomizers = listOf(eventLoopCustomizer)
}
}
2022-04-23T16:51:52,737 [reactor-tcp-epoll-3] WARN r.n.c.FluxReceive {} - An exception has been observed post termination, use DEBUG level to see the full stack: io.netty.handler.codec.DecoderException: javax.net.ssl.SSLException: Error finalising cipher data: mac check in GCM failed
Hi, I am currently trying to make ECDSA related ciphers to work with TLS 1.2 in Spring Cloud Gateway (Spring Boot Parent 2.6.7 and Spring Cloud 2021.0.2). Here's the snippet of WebServerFactoryCustomizer
@Bean
public WebServerFactoryCustomizer<NettyReactiveWebServerFactory> customizer() {
return factory -> factory.addServerCustomizers(httpServer -> httpServer.secure(sslContextSpec -> {
try {
Ssl ssl = factory.getSsl();
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
char[] keyStorePassword = ssl.getKeyStorePassword().toCharArray();
keyStore.load(resourceLoader.getResource(ssl.getKeyStore()).getInputStream(), keyStorePassword);
KeyManagerFactory keyManagerFactory = OpenSslCachingX509KeyManagerFactory
.getInstance(KeyManagerFactory.getDefaultAlgorithm());
keyManagerFactory.init(keyStore, keyStorePassword);
Http11SslContextSpec http11SslContextSpec = Http11SslContextSpec.forServer(keyManagerFactory)
.configure(sslContextBuilder -> {
sslContextBuilder.sslProvider(SslProvider.OPENSSL);
sslContextBuilder.ciphers(Arrays.asList(ssl.getCiphers()));
sslContextBuilder.protocols(ssl.getEnabledProtocols());
sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
});
sslContextSpec.sslContext(http11SslContextSpec)
.handlerConfigurator(sslHandler -> {
sslHandler.setCloseNotifyReadTimeout(18000, TimeUnit.MILLISECONDS);
sslHandler.setHandshakeTimeout(19000, TimeUnit.MILLISECONDS);
SSLParameters sslParameters = sslHandler.engine().getSSLParameters();
sslParameters.setUseCipherSuitesOrder(false);
sslHandler.engine().setSSLParameters(sslParameters);
});
} catch (UnrecoverableKeyException | IOException | CertificateException | KeyStoreException |
NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}));
}
But when I try to connect using openssl s_client with ECDHE-ECDSA-AES128-GCM-SHA256 cipher the server returns an error with no shared ciphers, but I do have it in the configuration as
server.ssl.ciphers=TLS_RSA_WITH_AES_128_GCM_SHA256,\
TLS_RSA_WITH_AES_256_GCM_SHA384, \
TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,\
TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,\
TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
server.ssl.enabled-protocols=TLSv1.2
This behavior was observed when I upgraded versions from Spring Boot 2.3.3.RELEASE and Spring Cloud Hoxton.SR7. Any advice/suggestions would be of great help on fixing or correctly configuring it.
Hi, we need some info about httpClient usage. It's possibile to use the same httpClient for multiple requests? In addition, can we setup custom headers for every request? I mean if something like this is possibile:
HttpClient client = HttpClient.create(connectionProvider)
.host(endpointHost)
.port(endpointPort)
...
//Firt requests
String response1 = client.headers(h -> h.add(request.getRequestHeaders().copy())
.request("GET")
.uri("http://example1.com/")
.send(request.getRequestData())
..
//Second request
String response1 = client.headers(h -> h.add(request.getRequestHeaders().copy())
.request("GET")
.uri("http://example2.com/")
.send(request.getRequestData())
..
Thanks in advance.
Hi, I'm facing some connection issues in a Kubernetes cluster where Netty acts as a server.
The clients are traditional Spring MVC applications which use a standard RestTemplate.
Sometimes, when the clients try to connect to an application based on Reactor Netty, they obtain two kinds of exceptions:
There are other cases where the connection is successful, but the client connection arrives to the Netty server after a lof of time (30 seconds or more).
The Netty server is NOT under heavy load and no errors are present in the logs.
Moreover, this seems to happen when the Netty server has been idle for several hours, i.e. the incoming connection is the first one after a long period of inactivity.
Has anyone experienced the same issue?
Does any configuration parameter (server side) exsist in Netty which could mitigate this issue?
Hi, I am using webclient to call services. I am using java-springboot. While making call to service using webclient, I am getting SSLHandShakeTimeOutException: handshake timed out affter 10000ms
Then I set the handShakeTimeMillis to 18000. Now I am getting I/O exception: An existing connection was forcibly closed by the remote host.
I tried setting up ConnectionProvider.fixed("test", 64). It did not work. Can somebody please help me to resolve this issue. I would be grateful!!
I am using reactor-netty of version 0.9.17.RELEASE
SslContext sslContext = SslContextBuilder.forClient().trustmanager(InsecureTrustmanagerFactory.INSTANCE).build();
ClientHttpConnector httpConnector = new ReactorClientHttpConnector(HttpClient.create().secure(t-> t.sslContext(sslContext).handshakeTimeoutMillis(18000)));
And I have used httpConnector to build webClient.
Error: reactor.netty.http.client.PrematureCloseException: Connection prematurely closed BEFORE response
.. We are not able to reproduce it in Dev enviroment. Debugging with the logs we see that occured when we send data to Tomcat, and before receive all response. Do you have any idea how I might approach this problem?