function doSomething(item, callback) {
if (itemIsGood(item)) {
callback(convertItem, null);
} else {
callback(null, new Error("something is wrong with the item"));
}
}
.map
will handle the error and propagate it downstream, it is an "exceptional" case, hence side-effect.RuntimeException
).sink.success()
(or sink.complete()
if Flux) instead of passing an error and handling it later in the same operator (in the catch block) to use less operators
question about 'request' method
final Flux<Boolean> randoms = Flux.generate(sink -> sink.next(true));
randoms.filter(x -> x == true).subscribe(x ->System.out.println(x));
as I know , for the above code , LambdaSubscriber will request data from FilterFuseableSubscriber , just like downstream(LambdaSubscriber) request data from upstream(FilterFuseableSubscriber) , and for FilterFuseableSubscriber it will request data from upstream, my question is :
@Override
public void request(long n) {
s.request(n);
}
why the request number is the same as the downstream, if downstream request data number is 5 , for filter function , the requestion should more than 5 , since that it will filter some data , so can somebody tell me the logic :)
.onErrorResume
then ?
requestOnSeparateThread
flag when we have separate schedulers for blocking and non-blocking reactor calls. I have used requestOnSeparateThread
in my application but couldn't see any difference on setting requestOnSeparateThread = true/false
Hello Reactor team.
Static composition can be done with combine() method, but I want a way to manage flux composition dynamically.
I would like to use it for a IoT project where the UI can listen one or more sensor depending what the user want.
The result flux is given to a web socket.
Subscribe or unsubscribe to a sensor flux is done with a rest service.
I would like to know if DirectProcessor is a good way to create a Flux multiplexer from others Flux ?
I create a SimulatorModule to test how I can combine different flux in one flux with a DirectProcessor.
It seams to work fine (in reality a sensor produce 1 data every 3 minutes - backpressure is not a pb now).
I also tried with FluxSink from DirectProcessor#sink() instead DirectProcessor#onNext() but it seams to be a way less performant !
What are the difference between these two way to feed the DirectProcessos ?
I tried with EmitterProcessor, it seams to be less performance, is it the case ?
Is EmitterProcessor,the only way to manage backpressure ?
public class SimulatorModule {
private DirectProcessor<Triple> dp;
private HashMap<String, Disposable> producers = new HashMap<>();
public SimulatorModule() {
dp = DirectProcessor.create();
long start = Instant.now().toEpochMilli();
}
public void simulate(String name, long interval) {
removeSimulator(name);
Disposable disposable = Flux.interval(Duration.ofMillis(interval))
.map(e -> new Triple(name, System.currentTimeMillis(), 1.0))
.subscribe(e -> dp.onNext(e));
producers.put(name, disposable);
}
public void removeSimulator(String name) {
Disposable disposable1 = producers.get(name);
if (disposable1 != null) {
disposable1.dispose();
producers.remove(name);
}
}
public Flux<Triple> flux() {
return dp;
}
public static class Triple {
public final String name;
public final long ts;
public final double value;
public Triple(String name, long ts, double value) {
this.name = name;
this.ts = ts;
this.value = value;
}
}
}
I call it with :
simulatorModule.flux()
.filter(e -> e.name.equals("F1"))
.subscribe(e -> System.out.println("1 " + e.name + " - " +Thread.currentThread().getName()));
simulatorModule.flux()
.filter(e -> e.name.equals("F2"))
.subscribe(e -> System.out.println("2 " + e.name + " - " +Thread.currentThread().getName()));
simulatorModule.simulate("F1", 1000);
simulatorModule.simulate("F2", 1200);
Thanks in advance for your help !
Hi Everyone
Could anyone please help me to understand how to wait (block until the result is ready) for 3 level parallel executions?
here is a test prototype I'm working on
https://pastebin.com/L04QQwGE
basically each notification can have multiple receivers, where those multiple receivers should be contacted trough various broadcast media. each notification broadcast can be executed in parallel and broadcast result appended to each receiver
(all notification data is automatically updated as it passed by ref)
it does work with Thred.sleep(xxx) however I need to intercept it in map() and save updated collection to DB
but it is not waiting for inner parallel task
Mono.just(record)
.flatMap(RunTransformation::tranformParallel)
.sequential()
.doOnTerminate(RunCompletion::record)
.subscribe(RunAction::execute,
RunError::handleError);
Hello. I have a problem. but the problem is very weired. so I can't certain that the problem is bug.
First, I will explain the situation.
I made l7check method. but because of blocking call issue, I wrapped using Schedulers.boundedElastic()
(My reactor-core version is 3.3.4)
@GetMapping("/monitor/l7check")
public Mono<ResponseEntity<String>> l7check() {
// https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking
// DEFAULT_BOUNDED_ELASTIC_SIZE(maximum number of underlying threads to create) : 10 * Runtime.getRuntime().availableProcessors()
// DEFAULT_BOUNDED_ELASTIC_QUEUESIZE : 100000
// DEFAULT_TTL_SECONDS(Time-to-live for an idle) : 60
return Mono
.fromCallable(this::l7checkBlockingCall)
.subscribeOn(Schedulers.boundedElastic());
}
when I run application server, There is no problem. but A day or two passed, and suddenly there was a problem with the l7 check.
10.x.x.x - - [24/Jun/2020:13:42:10 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.994 1.994 "-" "-" "-"
10.x.x.x - - [24/Jun/2020:13:48:16 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.996 1.995 "-" "-" "-"
10.x.x.x - - [24/Jun/2020:13:51:06 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.994 1.994 "-" "-" "-"
10.x.x.x - - [24/Jun/2020:13:52:16 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 2.006 2.006 "-" "-" "-"
10.x.x.x - - [24/Jun/2020:13:53:06 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.995 1.994 "-" "-" "-"
10.x.x.x - - [24/Jun/2020:13:55:06 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.995 1.995 "-" "-" "-"
l7 Not all requests matter. However, once a problem occurs, it continues to occur. It didn't happen when the reactor-netty version was 0.9.2. When I raised it to 0.9.6 this time, I had a problem. It is too difficult to create a problem-reproducing sample. Please advise on this matter. Please tell me if there is any lack of information.
Hello All,
Howto store the value from a asynch call,so i can pass it to the next asynch function as params
Say for instance as below
public Mono<Account> fetchAccount(){
Mono<Account> account = webClient.get().uri("/demo/account-partner").retrieve()
.bodyToMono(Account.class).onErrorMap(exception -> new RuntimeException("account-partner"));
return account; }
public Mono<Demo> fetchAccountAndPartner(String accountNo,String partnerNo){
Mono<Demo> accountAndPartner = webClient.get()
.uri(uriBuilder ->
UriComponentsBuilder.fromPath("/entity/{account-nr}/{partner-id}").build(accountNo,partnerNo)
).retrieve().bodyToMono(Demo.class);
return accountAndPartner;
}
I would like to get the results of fetchAccount() and and pass the accountNo and parnterNo from Account to
fetchAccountAndPartner(String accountNo,String partnerNo)
Thanks in Advance