Okay. when I use input.handle(this::validate)
. It wants me to create validate method with following signature.
private <R> void validate(String s, SynchronousSink<R> rSynchronousSink) {
}
I don't see BiConsumer
. Also, the return type of this method is void instead of Mono<Void>
, there could be some validations that require accessing the database using reactive repositories and I would have to call block()
on them in this validate method which I don't want to do. Actually I am new to reactor :| and unable to solve this problem. Could you please provide some more input?
Mono<Void>
, returns Mono<Boolean>
or some Mono<CustomObject>
. Then I can use flatmap
on it . But I am just wondering if it is possible if this method returns Mono<Void>
? I tried using `flatmap
for Mono<Void>
but it didn't work
@amangarg4804 .handle(this::validate)
automatically converts your method into BiConsumer
. I would suggest reading about Java's lambdas and method references.
If you need to do async validation, you can use: input.delayUntil(this::validate)
where private Mono<Void> validate(String s)
WebClient
to emit Monos of some data
Client1
and Client2
. I want to call each in a Mono.zip
so that they can be done concurrently
zip
method arguments
public Mono<Tuple2<String, String>> getPair() {
return Mono.zip(
client1.getFirstPart(),
client2.getFirstPart());
}
function doSomething(item, callback) {
if (itemIsGood(item)) {
callback(convertItem, null);
} else {
callback(null, new Error("something is wrong with the item"));
}
}
.map
will handle the error and propagate it downstream, it is an "exceptional" case, hence side-effect.RuntimeException
).sink.success()
(or sink.complete()
if Flux) instead of passing an error and handling it later in the same operator (in the catch block) to use less operators
question about 'request' method
final Flux<Boolean> randoms = Flux.generate(sink -> sink.next(true));
randoms.filter(x -> x == true).subscribe(x ->System.out.println(x));
as I know , for the above code , LambdaSubscriber will request data from FilterFuseableSubscriber , just like downstream(LambdaSubscriber) request data from upstream(FilterFuseableSubscriber) , and for FilterFuseableSubscriber it will request data from upstream, my question is :
@Override
public void request(long n) {
s.request(n);
}
why the request number is the same as the downstream, if downstream request data number is 5 , for filter function , the requestion should more than 5 , since that it will filter some data , so can somebody tell me the logic :)
.onErrorResume
then ?