question about 'request' method
final Flux<Boolean> randoms = Flux.generate(sink -> sink.next(true));
randoms.filter(x -> x == true).subscribe(x ->System.out.println(x));
as I know , for the above code , LambdaSubscriber will request data from FilterFuseableSubscriber , just like downstream(LambdaSubscriber) request data from upstream(FilterFuseableSubscriber) , and for FilterFuseableSubscriber it will request data from upstream, my question is :
@Override
public void request(long n) {
s.request(n);
}
why the request number is the same as the downstream, if downstream request data number is 5 , for filter function , the requestion should more than 5 , since that it will filter some data , so can somebody tell me the logic :)
.onErrorResume
then ?
requestOnSeparateThread
flag when we have separate schedulers for blocking and non-blocking reactor calls. I have used requestOnSeparateThread
in my application but couldn't see any difference on setting requestOnSeparateThread = true/false
Hello Reactor team.
Static composition can be done with combine() method, but I want a way to manage flux composition dynamically.
I would like to use it for a IoT project where the UI can listen one or more sensor depending what the user want.
The result flux is given to a web socket.
Subscribe or unsubscribe to a sensor flux is done with a rest service.
I would like to know if DirectProcessor is a good way to create a Flux multiplexer from others Flux ?
I create a SimulatorModule to test how I can combine different flux in one flux with a DirectProcessor.
It seams to work fine (in reality a sensor produce 1 data every 3 minutes - backpressure is not a pb now).
I also tried with FluxSink from DirectProcessor#sink() instead DirectProcessor#onNext() but it seams to be a way less performant !
What are the difference between these two way to feed the DirectProcessos ?
I tried with EmitterProcessor, it seams to be less performance, is it the case ?
Is EmitterProcessor,the only way to manage backpressure ?
public class SimulatorModule {
private DirectProcessor<Triple> dp;
private HashMap<String, Disposable> producers = new HashMap<>();
public SimulatorModule() {
dp = DirectProcessor.create();
long start = Instant.now().toEpochMilli();
}
public void simulate(String name, long interval) {
removeSimulator(name);
Disposable disposable = Flux.interval(Duration.ofMillis(interval))
.map(e -> new Triple(name, System.currentTimeMillis(), 1.0))
.subscribe(e -> dp.onNext(e));
producers.put(name, disposable);
}
public void removeSimulator(String name) {
Disposable disposable1 = producers.get(name);
if (disposable1 != null) {
disposable1.dispose();
producers.remove(name);
}
}
public Flux<Triple> flux() {
return dp;
}
public static class Triple {
public final String name;
public final long ts;
public final double value;
public Triple(String name, long ts, double value) {
this.name = name;
this.ts = ts;
this.value = value;
}
}
}
I call it with :
simulatorModule.flux()
.filter(e -> e.name.equals("F1"))
.subscribe(e -> System.out.println("1 " + e.name + " - " +Thread.currentThread().getName()));
simulatorModule.flux()
.filter(e -> e.name.equals("F2"))
.subscribe(e -> System.out.println("2 " + e.name + " - " +Thread.currentThread().getName()));
simulatorModule.simulate("F1", 1000);
simulatorModule.simulate("F2", 1200);
Thanks in advance for your help !
Hi Everyone
Could anyone please help me to understand how to wait (block until the result is ready) for 3 level parallel executions?
here is a test prototype I'm working on
https://pastebin.com/L04QQwGE
basically each notification can have multiple receivers, where those multiple receivers should be contacted trough various broadcast media. each notification broadcast can be executed in parallel and broadcast result appended to each receiver
(all notification data is automatically updated as it passed by ref)
it does work with Thred.sleep(xxx) however I need to intercept it in map() and save updated collection to DB
but it is not waiting for inner parallel task
Mono.just(record)
.flatMap(RunTransformation::tranformParallel)
.sequential()
.doOnTerminate(RunCompletion::record)
.subscribe(RunAction::execute,
RunError::handleError);
Hello. I have a problem. but the problem is very weired. so I can't certain that the problem is bug.
First, I will explain the situation.
I made l7check method. but because of blocking call issue, I wrapped using Schedulers.boundedElastic()
(My reactor-core version is 3.3.4)
@GetMapping("/monitor/l7check")
public Mono<ResponseEntity<String>> l7check() {
// https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking
// DEFAULT_BOUNDED_ELASTIC_SIZE(maximum number of underlying threads to create) : 10 * Runtime.getRuntime().availableProcessors()
// DEFAULT_BOUNDED_ELASTIC_QUEUESIZE : 100000
// DEFAULT_TTL_SECONDS(Time-to-live for an idle) : 60
return Mono
.fromCallable(this::l7checkBlockingCall)
.subscribeOn(Schedulers.boundedElastic());
}
when I run application server, There is no problem. but A day or two passed, and suddenly there was a problem with the l7 check.
10.x.x.x - - [24/Jun/2020:13:42:10 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.994 1.994 "-" "-" "-"
10.x.x.x - - [24/Jun/2020:13:48:16 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.996 1.995 "-" "-" "-"
10.x.x.x - - [24/Jun/2020:13:51:06 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.994 1.994 "-" "-" "-"
10.x.x.x - - [24/Jun/2020:13:52:16 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 2.006 2.006 "-" "-" "-"
10.x.x.x - - [24/Jun/2020:13:53:06 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.995 1.994 "-" "-" "-"
10.x.x.x - - [24/Jun/2020:13:55:06 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.995 1.995 "-" "-" "-"
l7 Not all requests matter. However, once a problem occurs, it continues to occur. It didn't happen when the reactor-netty version was 0.9.2. When I raised it to 0.9.6 this time, I had a problem. It is too difficult to create a problem-reproducing sample. Please advise on this matter. Please tell me if there is any lack of information.
Hello All,
Howto store the value from a asynch call,so i can pass it to the next asynch function as params
Say for instance as below
public Mono<Account> fetchAccount(){
Mono<Account> account = webClient.get().uri("/demo/account-partner").retrieve()
.bodyToMono(Account.class).onErrorMap(exception -> new RuntimeException("account-partner"));
return account; }
public Mono<Demo> fetchAccountAndPartner(String accountNo,String partnerNo){
Mono<Demo> accountAndPartner = webClient.get()
.uri(uriBuilder ->
UriComponentsBuilder.fromPath("/entity/{account-nr}/{partner-id}").build(accountNo,partnerNo)
).retrieve().bodyToMono(Demo.class);
return accountAndPartner;
}
I would like to get the results of fetchAccount() and and pass the accountNo and parnterNo from Account to
fetchAccountAndPartner(String accountNo,String partnerNo)
Thanks in Advance
ERROR: Permission to reactor/reactor-core.git denied to johncfranco.
fatal: Could not read from remote repository.
Please make sure you have the correct access rights
and the repository exists.
reactor/reactor-core repository
https://docs.github.com/en/github/getting-started-with-github/fork-a-repo
Hi can anyone help me please ?
Pinging here after trying all afternoon and having no success .
Mono.defer(() -> apiCallWhichReturnsMono())
.doOnError(e -> callSomeAPI().subscribe())
.map(someData -> continue with data)
I don't want to do .subscribe()
explicitly in doOnError because it loses the subscriberContext that is coming from downstream.
Also I dont want to wait on callSomeAPI
cause I’m not interested in the data.
Any help is appreciated.