function doSomething(item, callback) {
if (itemIsGood(item)) {
callback(convertItem, null);
} else {
callback(null, new Error("something is wrong with the item"));
}
}
.map
will handle the error and propagate it downstream, it is an "exceptional" case, hence side-effect.RuntimeException
).sink.success()
(or sink.complete()
if Flux) instead of passing an error and handling it later in the same operator (in the catch block) to use less operators
question about 'request' method
final Flux<Boolean> randoms = Flux.generate(sink -> sink.next(true));
randoms.filter(x -> x == true).subscribe(x ->System.out.println(x));
as I know , for the above code , LambdaSubscriber will request data from FilterFuseableSubscriber , just like downstream(LambdaSubscriber) request data from upstream(FilterFuseableSubscriber) , and for FilterFuseableSubscriber it will request data from upstream, my question is :
@Override
public void request(long n) {
s.request(n);
}
why the request number is the same as the downstream, if downstream request data number is 5 , for filter function , the requestion should more than 5 , since that it will filter some data , so can somebody tell me the logic :)
.onErrorResume
then ?
requestOnSeparateThread
flag when we have separate schedulers for blocking and non-blocking reactor calls. I have used requestOnSeparateThread
in my application but couldn't see any difference on setting requestOnSeparateThread = true/false
Hello Reactor team.
Static composition can be done with combine() method, but I want a way to manage flux composition dynamically.
I would like to use it for a IoT project where the UI can listen one or more sensor depending what the user want.
The result flux is given to a web socket.
Subscribe or unsubscribe to a sensor flux is done with a rest service.
I would like to know if DirectProcessor is a good way to create a Flux multiplexer from others Flux ?
I create a SimulatorModule to test how I can combine different flux in one flux with a DirectProcessor.
It seams to work fine (in reality a sensor produce 1 data every 3 minutes - backpressure is not a pb now).
I also tried with FluxSink from DirectProcessor#sink() instead DirectProcessor#onNext() but it seams to be a way less performant !
What are the difference between these two way to feed the DirectProcessos ?
I tried with EmitterProcessor, it seams to be less performance, is it the case ?
Is EmitterProcessor,the only way to manage backpressure ?
public class SimulatorModule {
private DirectProcessor<Triple> dp;
private HashMap<String, Disposable> producers = new HashMap<>();
public SimulatorModule() {
dp = DirectProcessor.create();
long start = Instant.now().toEpochMilli();
}
public void simulate(String name, long interval) {
removeSimulator(name);
Disposable disposable = Flux.interval(Duration.ofMillis(interval))
.map(e -> new Triple(name, System.currentTimeMillis(), 1.0))
.subscribe(e -> dp.onNext(e));
producers.put(name, disposable);
}
public void removeSimulator(String name) {
Disposable disposable1 = producers.get(name);
if (disposable1 != null) {
disposable1.dispose();
producers.remove(name);
}
}
public Flux<Triple> flux() {
return dp;
}
public static class Triple {
public final String name;
public final long ts;
public final double value;
public Triple(String name, long ts, double value) {
this.name = name;
this.ts = ts;
this.value = value;
}
}
}
I call it with :
simulatorModule.flux()
.filter(e -> e.name.equals("F1"))
.subscribe(e -> System.out.println("1 " + e.name + " - " +Thread.currentThread().getName()));
simulatorModule.flux()
.filter(e -> e.name.equals("F2"))
.subscribe(e -> System.out.println("2 " + e.name + " - " +Thread.currentThread().getName()));
simulatorModule.simulate("F1", 1000);
simulatorModule.simulate("F2", 1200);
Thanks in advance for your help !
Hi Everyone
Could anyone please help me to understand how to wait (block until the result is ready) for 3 level parallel executions?
here is a test prototype I'm working on
https://pastebin.com/L04QQwGE
basically each notification can have multiple receivers, where those multiple receivers should be contacted trough various broadcast media. each notification broadcast can be executed in parallel and broadcast result appended to each receiver
(all notification data is automatically updated as it passed by ref)
it does work with Thred.sleep(xxx) however I need to intercept it in map() and save updated collection to DB
but it is not waiting for inner parallel task
Mono.just(record)
.flatMap(RunTransformation::tranformParallel)
.sequential()
.doOnTerminate(RunCompletion::record)
.subscribe(RunAction::execute,
RunError::handleError);