Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
violetagg
@violetagg
ConnectionProvider connectionProvider = ConnectionProvider.builder(serviceConfiguration.getName() + "_" + apiConfiguration.getName())
                .maxConnections(apiConfiguration.getConcurrency())
                .maxIdleTime(Duration.ofSeconds(3600))
                .pendingAcquireTimeout(Duration.ofMillis(5))
                .pendingAcquireMaxCount(1)
                //.metrics(true)
                .lifo()
                .build();

        ;
This is interesting, so you allow only 1 pending request, and the aquire timeout is 5ms is that correct and intentional?
apiConfiguration.getConcurrency() how many connections is this?
How do you behave with the default ConnectionProvider (max conn 500, 45s acquire timeout, pending requests unbounded)
a new thread on each request I don’t get this
by default Reactor Netty runs with theads number equal to the cores or at least 4
vikasvb90
@vikasvb90
Similar issues with the default ConnectionProvider. apiConfiguration.getConcurrency() is configured at against each api. Currently, the value is 500 connections for every api. Thread number equal to the number of cores is much less than the hystrix threadpool where every request was executed in a new thread.
we can clearly see a significant difference in the number of threads being used though
from 4000 to 150
Does pendingAcquireMaxCount denote the number of pending connections to be acquired or pending requests?
violetagg
@violetagg
these are requests that will need a connection from the pool
what do you mean with pending connections?
if we have a connection in the pool we will provide it for executing the request, if we do not have connection but we are below max connection then we will create a connection and provide it, if we do not have connection and we reached max connections then this request will wait for a connection to be freed, or will expire when acquire timeout is reached
vikasvb90
@vikasvb90
So you are saying that in this scenarios, when 500 connections are already active, only one request will wait for 5s.. rest of them will be rejected
5ms
violetagg
@violetagg
yes
vikasvb90
@vikasvb90
Ok got it. But the problem here is that the connections are not coming out of active states. Let me attach the graph
violetagg
@violetagg
what’s Reactor Netty version
vikasvb90
@vikasvb90
<dependency> <groupId>io.projectreactor.netty</groupId> <artifactId>reactor-netty</artifactId> <version>0.9.6.RELEASE</version> </dependency>
violetagg
@violetagg
So the connections stays always in activeand never return to the pool, or you just utilise all of them because of the load?
vikasvb90
@vikasvb90
no.. they are never returned to the pool
not because of the load
it has been 3 hours ..
violetagg
@violetagg
as I don’t know how you use WebClient please check this issue whether it is relevant for your WebClient usage spring-projects/spring-framework#24788
vikasvb90
@vikasvb90
Let me attach the code where I am using webclient
String path = resolvePath(apiConfiguration, request);
            URI uri = UriComponentsBuilder.fromUriString(serviceConfiguration.getBaseUrl()).path(path)
                    .queryParams(applyQueryParams(request)).build(true).toUri();
            func.apply(request)
                    .uri(uri)
                    .headers(new Consumer<HttpHeaders>() {
                        @Override
                        public void accept(HttpHeaders httpHeaders) {
                            if (request.getHeaders() == null) {
                                httpHeaders.addAll(getDefaultHeaders());
                            } else {
                                if (!request.getHeaders().containsKey("Content-Type")) {
                                    httpHeaders.add("Content-Type", "application/json");
                                }
                                request.getHeaders().forEach(httpHeaders::add);
                            }
                        }
                    })
                    .body(request.getBody() == null ? null : BodyInserters.fromValue(request.getBody()))
                    .exchange()
                    .timeout(Duration.ofMillis(apiConfiguration.getRequestTimeout()), Mono.error(new TimeoutException()))
                    .doOnError(throwable -> throwable instanceof io.netty.handler.timeout.TimeoutException, throwable -> {
                        if (throwable instanceof ReadTimeoutException) {
                            log.error("Service {}  api {} ReadTimeoutException  in flux ", serviceConfiguration.getName(),
                                    apiConfiguration.getPath(), throwable);
                            logTimeTaken(startTime, throwable);
                            responseHandler.handleError(null, new EmissaryAsyncServerTimeOutException());
                        } else if (throwable instanceof WriteTimeoutException) {
                            log.error("Service {}  api {} WriteTimeoutException  in flux", serviceConfiguration.getName(),
                                    apiConfiguration.getPath(), throwable);
                            logTimeTaken(startTime, throwable);
                            responseHandler.handleError(null, new EmissaryAsyncServerTimeOutException());
                        } else {
                            log.error("Service {}  api {} io.netty.handler.timeout.TimeoutException  in flux", serviceConfiguration.getName(),
                                    apiConfiguration.getPath(), throwable);
                            logTimeTaken(startTime, throwable);
                            responseHandler.handleError(null, new EmissaryAsyncServerTimeOutException());
                        }
                    })
                    .doOnError(throwable -> throwable instanceof PoolAcquireTimeoutException, throwable -> {
                        log.error("Service {}  api {} PoolAcquireTimeoutException  error in merging flux", serviceConfiguration.getName(),
                                    apiConfiguration.getPath(), throwable);
                        logTimeTaken(startTime, throwable);
                        responseHandler.handleError(null, new EmissaryAsyncServerTimeOutException());
                    })
                    .doOnError(throwable -> throwable instanceof PoolAcquirePendingLimitException, throwable -> {
                        log.error("Service {}  api {} PoolAcquirePendingLimitException  error in merging flux", serviceConfiguration.getName(),
                                    apiConfiguration.getPath(), throwable);
                        logTimeTaken(startTime, throwable);
                        responseHandler.handleError(null, new EmissaryAsyncServerTimeOutException());
                    })
                    .doOnError(throwable -> throwable instanceof ConnectTimeoutException, throwable -> {
                        log.error("Service {}  api {} ConnectTimeoutException  error in merging flux", serviceConfiguration.getName(),
                                    apiConfiguration.getPath(), throwable);
                        logTimeTaken(startTime, throwable);
                        responseHandler.handleError(null, new EmissaryAsyncServerTimeOutException());
                    })
                    .doOnError(throwable -> throwable instanceof TimeoutException, throwable -> {
                        log.error("Service {}  api {} TimeoutException  error in merging flux", serviceConfiguration.getName(),
                                    apiConfiguration.getPath(), throwable);
                        logTimeTaken(startTime, throwable);
                        responseHandler.handleError(null, new EmissaryAsyncServerTimeOutException());
                    })
                    .doOnError(throwable -> throwable instanceof PrematureCloseException, throwable -> {
                        logTimeTaken(startTime, throwable);
                        responseHandler.handleError(null, new EmissaryPrematureCloseException());
                    })
                    .doOnError(throwable -> throwable instanceof Throwable, throwable -> {
                        log.error("Service {}  api {}  {}  error in exchange function", serviceConfiguration.getName(),
                                    apiConfiguration.getPath(), throwable.getClass().getName(), throwable);
                        if (!(throwab
I think it got truncated
violetagg
@violetagg
yes I cannot see how you handle the response body
vikasvb90
@vikasvb90
.doOnError(throwable -> throwable instanceof Throwable, throwable -> {
                        log.error("Service {}  api {}  {}  error in exchange function", serviceConfiguration.getName(),
                                    apiConfiguration.getPath(), throwable.getClass().getName(), throwable);
                        if (!(throwable instanceof ReadTimeoutException) && !(throwable instanceof io.netty.handler.timeout.TimeoutException)
                                && !(throwable instanceof WriteTimeoutException) && !(throwable instanceof PoolAcquirePendingLimitException)
                                && !(throwable instanceof PoolAcquireTimeoutException) && !(throwable instanceof PrematureCloseException)
                                && !(throwable instanceof ConnectTimeoutException)) {
                            EmissaryAsyncException emissaryAsyncException = new EmissaryAsyncException();
                            emissaryAsyncException.setStackTrace(throwable.getStackTrace());
                            responseHandler.handleError(null, emissaryAsyncException);
                        }
                    })
                    .flatMap(response -> {
                        int statusCode = response.statusCode().value();
                        log.error("API : {} response code : {}", serviceConfiguration.getBaseUrl() + apiConfiguration.getPath(), statusCode);
                        return response.toEntity(new ParameterizedTypeReference<String>() {
                        });
                    })
                    .subscribe(data -> {
                        boolean acceptable = apiConfiguration.getAcceptableResponseCodes().contains(data.getStatusCode().value());
                        byte[] body = data.getBody() == null ? null : data.getBody().getBytes();
                        if (acceptable)
                            responseHandler.handleResponse(HttpResponse.builder().body(body).statusCode(data.getStatusCodeValue()).headers(data.getHeaders().toSingleValueMap()).build());
                        else
                            responseHandler.handleError(HttpResponse.builder().body(body).statusCode(data.getStatusCodeValue()).build(), new EmissaryAsyncException(body, data.getStatusCodeValue()));
                    });
https://drive.google.com/file/d/1ZYi6Qmn-PyD2JnpIoBTT_vH17iuyPUqx/view?usp=sharing this is the snapshot of the active connections of a service
reactor_netty_connection_provider_UserService_getUserMultiAttributes_active_connections
of this metrics
maxConnections were 40
violetagg
@violetagg
I don’t know what this responseHandler.handleError( is doing but if this does not consume the response body this might be the cause for the connection leak
you might even experience memory leak if you do not consume the body
I mean you do not have onError*
vikasvb90
@vikasvb90
If there is an error then in case of error this can cause memory /connection leaks?
this code
typo
I mean due to the absence of onError this code can produce memory/connection leaks?
violetagg
@violetagg
try to put somewhere an small example that can reproduce the problem
vikasvb90
@vikasvb90
if I understand it correctly is the body not consumed already in subscribe and then only handleError is triggered
vikasvb90
@vikasvb90
@violetagg
violetagg
@violetagg
I cannot say - try to provide a reproducible example
vikasvb90
@vikasvb90
@violetagg the problem here is that we are unsure of the cause of the error. There are around 80-100 downstream services being called in the normal flows and when we try one of the latent calls on local, it just works. We are still trying all types of error we have seen so far in local.
But meanwhile, if you have a reference to a production ready module which is using spring webclient for reactor netty with hystrix circuit breaker module, it would really help.
vikasvb90
@vikasvb90
Our goal is very straightforward, we want to get away with thread-based model of multiple parallel calls and want to use non-blocking I/O model for better scalability. For this, we decided to use spring webclient which uses reactor netty by default. And we switched the hystrix connection strategy from threadpool to semaphore. Any other production ready alternative which can provide the expected results is also welcome.
rowenta
@rowenta
Hi folks, I was wondering, how expensive operations like publishOn and subscribeOn are? If I have, for example, a source Flux of Strings and I want to append another String constant to each element emitted, how expensive that operation would be to perform, if I switch to a different pool of thread, instead of performing that operation in the thread that assembled the Flux?
Siarhei Abramenka
@Siarhei-Abramenka
Hi guys. Sorry if this's a stupid question.
Is this valid code when I'm using subscribe in subscribe? It's changed example of original code
public void process() {
    Flux.just("1", "2", "3")
            .flatMap(this::error)
            .onErrorContinue((throwable, o) -> {
                if (throwable instanceof RuntimeException) {
                    Flux.just("word1", "word2", "word3")
                            .subscribe(System.out::println);
                }
            })
            .subscribe(System.out::println);
}

public Flux<String> error(String s) {
    if (s.equals("2")) {
        throw new RuntimeException("error");
    }
    return Flux.just(s);
}
Rossen Stoyanchev
@rstoyanchev
@vikasvb90 what server is this running on and what does responseHandler do? are the connections shown on the graph tied to the client (and not say the server)? why is it a leak, or could it be normal growth? where is the hystrix part in all this?