RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
zip()
is what you need I guess http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Single.html#zip-java.lang.Iterable-io.reactivex.functions.Function-
<T> Completable whenAll(List<Single<T>> singles) {
return Single.merge(singles).ignoreElements();
}
io.reactivex.Completable
will notify you about completaion over onError
or onComplete
. If you are required to return exectly Single
type, you may replace ignoreElements
with last
<T> Single<T> whenAll(List<Single<T>> singles, T fallback) {
return Single.merge(singles).last(fallback);
}
Singles
one by one in order in which they were passed you should refer to next example
<T> Flowable whenAll(List<Single<T>> singles) {
return Single.concat(singles);
}
.errorStrategyContinue
, but you may join the discussion and propose and additional resume operator with fallback -> reactor/reactor-core#629
Another question. Is there any operator is equivalent andThen
for Completable
in Observable
? For example i use this really often
API.login
.doOnNext(//save the login datas)
.ignoreElements()
.andThen(//other observable Calls)
I think concatWith is somehow the good one but it need downstream be the same type.
just
. Since these come through onNext
, there is guarantted to be one producer of them. The drain will then read from this SPSC queue and the queues of the inner Subscriber
s from a guaranteed single thread at a time.
Ah gotcha! So the main queue is only responsible for scalars from the following line:
if (t instanceof ScalarSynchronousObservable) {
tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
}
Each inner will have its own queue and not use the main. @akarnokd ?
flatDir
, I can't seem to use the Java 9 part.
Hi,
Was wondering if the community could help me on how I could parallelize the 3 calls shown in the code below?
Observable<CheckoutDetailsDTO> o = Observable.zip(
orderIntegrationService.getRxOrderByOrderId(orderId),
shippingIntegrationService.getRxAddressListForUser(),
paymentIntegrationService.getRxPaymentInfoForUser(),
(order, addresslist, paymntinfo) -> new CheckoutDetailsDTO(order, addresslist, paymntinfo));
The calls to order, shipping and payment happen to be called one after another in the code above, is there any way it can be parallelized as the 3 calls are independent of each other ?
orderIntegrationService.getRxOrderByOrderId(orderId).subscribeOn(Schedulers.io()),
shippingIntegrationService.getRxAddressListForUser().subscribeOn(Schedulers.io()),
paymentIntegrationService.getRxPaymentInfoForUser().subscribeOn(Schedulers.io()),