Hello. I have a problem. but the problem is very weired. so I can't certain that the problem is bug.
First, I will explain the situation.
I made l7check method. but because of blocking call issue, I wrapped using Schedulers.boundedElastic()
(My reactor-core version is 3.3.4)
@GetMapping("/monitor/l7check")
public Mono<ResponseEntity<String>> l7check() {
// https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking
// DEFAULT_BOUNDED_ELASTIC_SIZE(maximum number of underlying threads to create) : 10 * Runtime.getRuntime().availableProcessors()
// DEFAULT_BOUNDED_ELASTIC_QUEUESIZE : 100000
// DEFAULT_TTL_SECONDS(Time-to-live for an idle) : 60
return Mono
.fromCallable(this::l7checkBlockingCall)
.subscribeOn(Schedulers.boundedElastic());
}
when I run application server, There is no problem. but A day or two passed, and suddenly there was a problem with the l7 check.
10.x.x.x - - [24/Jun/2020:13:42:10 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.994 1.994 "-" "-" "-"
10.x.x.x - - [24/Jun/2020:13:48:16 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.996 1.995 "-" "-" "-"
10.x.x.x - - [24/Jun/2020:13:51:06 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.994 1.994 "-" "-" "-"
10.x.x.x - - [24/Jun/2020:13:52:16 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 2.006 2.006 "-" "-" "-"
10.x.x.x - - [24/Jun/2020:13:53:06 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.995 1.994 "-" "-" "-"
10.x.x.x - - [24/Jun/2020:13:55:06 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.995 1.995 "-" "-" "-"
l7 Not all requests matter. However, once a problem occurs, it continues to occur. It didn't happen when the reactor-netty version was 0.9.2. When I raised it to 0.9.6 this time, I had a problem. It is too difficult to create a problem-reproducing sample. Please advise on this matter. Please tell me if there is any lack of information.
Hello All,
Howto store the value from a asynch call,so i can pass it to the next asynch function as params
Say for instance as below
public Mono<Account> fetchAccount(){
Mono<Account> account = webClient.get().uri("/demo/account-partner").retrieve()
.bodyToMono(Account.class).onErrorMap(exception -> new RuntimeException("account-partner"));
return account; }
public Mono<Demo> fetchAccountAndPartner(String accountNo,String partnerNo){
Mono<Demo> accountAndPartner = webClient.get()
.uri(uriBuilder ->
UriComponentsBuilder.fromPath("/entity/{account-nr}/{partner-id}").build(accountNo,partnerNo)
).retrieve().bodyToMono(Demo.class);
return accountAndPartner;
}
I would like to get the results of fetchAccount() and and pass the accountNo and parnterNo from Account to
fetchAccountAndPartner(String accountNo,String partnerNo)
Thanks in Advance
ERROR: Permission to reactor/reactor-core.git denied to johncfranco.
fatal: Could not read from remote repository.
Please make sure you have the correct access rights
and the repository exists.
reactor/reactor-core repository
https://docs.github.com/en/github/getting-started-with-github/fork-a-repo
Hi can anyone help me please ?
Pinging here after trying all afternoon and having no success .
Mono.defer(() -> apiCallWhichReturnsMono())
.doOnError(e -> callSomeAPI().subscribe())
.map(someData -> continue with data)
I don't want to do .subscribe()
explicitly in doOnError because it loses the subscriberContext that is coming from downstream.
Also I dont want to wait on callSomeAPI
cause I’m not interested in the data.
Any help is appreciated.
@Test
fun stringBuilderWithWebFLux() {
val builder = StringBuilder()
builder.appendln("Initial")
val list = listOf(1,2,3)
val res = Flux.fromIterable(list)
.flatMap {
apiCall(it).map { builder.appendln(it) }
}.then(builder.toString().toMono())
StepVerifier.create(res)
.consumeNextWith {
print(it) //returns Intial instead of Initial \n Value 1 \n Value 2 \n Value 3
}.verifyComplete()
}
private fun apiCall(it: Int?): Mono<String> {
return "Value $it".toMono()
}
Hi. I wonder if my code is correct. I have a method that produces a Flux and I need to pick one element from it, or do something if the Flux is empty (generate an error). So far I have the following:
service.getFoo("foo")
.sort((Comparator.comparing(Foo:date)))
.next()
.switchIfEmpty(Mono.error(NotFoundException::new))
.doOnNext(System.out::println)
.subscribe();
It seems to work fine but I'm seeing the following ERROR in the log output:
2021-02-23 15:23:22 [main] ERROR [] reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: com.idexx.isc.realtime.server.session.ClientNotFoundException
Caused by: NotFoundException: null
...
Hi folks!!
I'm new using reactor and I faced an issue.
In the following code
val flux = range(1, 10)
.log()
.doOnCancel { countDown.countDown() }
.doOnComplete { countDown.countDown() }
.delayElements(Duration.ofSeconds(1))
.doOnNext {
val state = repository.find(receipt.movementIdentification)
logger.info("State = $state")
if (state == "CANCELED") throw PixCanceledException("server.error", "Pix cancelled")
if (state == "FINISHED") countDown.countDown
}
.map { receipt }
.collectList()
.map { it.component1() }
.publishOn(Schedulers.boundedElastic())
flux.subscribe()
countDown.await()
In the code above I wanna finish/cancel a subscription of a subscriber. How can I do this ?
Looking for some questions in the stackoverflow btw I saw that isn't a motivation to do this. In my case I could throw an exception and it will work, but I wanna do the work in the right way.
Reading the documentation, it don't say specifically how can I send a signal from a onNext step. In the documentation explain more about signals send by example when an error occur or cancel the subscription out of the producer.
Sincerly I expect that I could express myselsf
Tks !!
Processor
API over to using the new Sinks
API instead. The new API seems much more intuitive and well thought out, thanks for all your hard work! One thing we've been hit by is the difference between a call to processor.sink()
which provides a serialised sink that gates/serialises concurrent calls to onNext()
, and the Sinks.many()
factory methods that fail with EmitResult.FAIL_NON_SERIALIZED
when multiple producers emit concurrently. I can understand the rationale for this, and found this SO answer which suggests a 'wait and retry' failure handler to work around the issue, and this seems to be working for us. My first question is: is this a reasonable way to solve this problem; and second: are there any plans to provide some additional factory methods or decorators to do this with an API call without needing a custom failure handler? I am thinking something like Sinks.many().unicast().serialised()
or something similar? Thanks in advance!
Hello People,
Going to be a long read; :(
Working with reactor on web-flux and having some performance issues when we load test with a lot of concurrent users(15-20K). We have two basic questions and possibly could help us improve the performance of our systems
Any specific reason CoreSubscriber methods are void and not Mono<Void>? Does that mean whatever code we will write here will be blocking? We are using slf4j-MDC to pass a trace-id into all the logs. For this we have similar approach as mentioned here. The approach is to implement a core-subscriber where we will copy the traceid to the MDC from the current context. We see a lot of CPU time is spent in this method itself. Please let us know if we are doing something wrong.
How does running tasks on scheduler helps if we have a single CPU? In our case we are running 10 kubernetes containers with each "1000m" CPU, which is more like a single core machine(if I understand correctly). In this case it does not matter if we run some stuff on scheduler because at the end there is a single core and nothing can be scheduled concurrently. To test my above theory I doubled the CPU in each container(2000m) and reduced the numbers of containers to 5 but there was not much difference. Theocratically, it should reduce the response time but there was not much difference.
Please let us know if there are any pointers on these.