RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Hi,
I'm a bit confused about how doOnComplete
works.
Flowable<String> emitter = Flowable.just("aa", "ba", "ab", "bb", "ac")
.doOnComplete(() -> println(Thread.currentThread().getName() + " - end emit"));
Flowable<GroupedFlowable<Character, String>> groupBy = emitter.groupBy(s -> s.charAt(0));
groupBy.flatMap(characterStringGroupedFlowable -> bufferAndDelay(stats(characterStringGroupedFlowable)))
.subscribeOn(Schedulers.from(Executors.newFixedThreadPool(1)))
.test()
.await();
private Flowable<String> stats(GroupedFlowable<Character, String> groupedFlowable) {
return groupedFlowable.compose(upstream -> {
AtomicLong count = new AtomicLong(0);
return upstream
.doOnNext(s -> {
count.incrementAndGet();
println(Thread.currentThread().getName() + " - group: " + groupedFlowable.getKey() + " - stats - [" + s + "] - count " + count.get());
})
.doOnComplete(() -> println(Thread.currentThread().getName() + " - group: " + groupedFlowable.getKey() + " - stats - complete - count " + count.get()));
});
}
private Flowable<String> bufferAndDelay(Flowable<String> flowable) {
return flowable.publish(stringFlowable ->
stringFlowable.buffer(1)
.concatMap(bufferedStrings ->
Flowable.just(bufferedStrings)
.doOnNext(x -> println(Thread.currentThread().getName() + " - before delay - " + x))
.delay(100, TimeUnit.MILLISECONDS, Schedulers.computation())
.doOnNext(x -> println(Thread.currentThread().getName() + " - after delay - " + x))
.flatMap(Flowable::fromIterable)
));
}
Output:
pool-1-thread-1 - group: a - stats - [aa] - count 1
pool-1-thread-1 - before delay - [aa]
pool-1-thread-1 - group: b - stats - [ba] - count 1
pool-1-thread-1 - before delay - [ba]
pool-1-thread-1 - group: a - stats - [ab] - count 2
pool-1-thread-1 - group: b - stats - [bb] - count 2
pool-1-thread-1 - end emit
pool-1-thread-1 - group: a - stats - complete - count 2
pool-1-thread-1 - group: b - stats - complete - count 2
RxComputationThreadPool-1 - after delay - [aa]
RxComputationThreadPool-2 - after delay - [ba]
RxComputationThreadPool-1 - group: a - stats - [ac] - count 3
RxComputationThreadPool-1 - before delay - [ab]
RxComputationThreadPool-2 - before delay - [bb]
RxComputationThreadPool-4 - after delay - [bb]
RxComputationThreadPool-3 - after delay - [ab]
RxComputationThreadPool-3 - before delay - [ac]
RxComputationThreadPool-5 - after delay - [ac]
Result is that
pool-1-thread-1 - group: a - stats - complete - count 2
pool-1-thread-1 - group: b - stats - complete - count 2
...
RxComputationThreadPool-1 - group: a - stats - [ac] - count 3
Why stats - complete is before stats - [ac] - count 3
? Am I break rule saying "emit event synchronously, not concurrently"
When I remove publish
from bufferAndDelay
method order is correct.
Shorter version:
Flowable<String> emitter = Flowable.just("aa", "ba", "ab", "bb", "ac")
.doOnComplete(() -> println(Thread.currentThread().getName() + " - end emit"));
Flowable<GroupedFlowable<Character, String>> groupBy = emitter.groupBy(s -> s.charAt(0));
groupBy.flatMap(characterStringGroupedFlowable -> {
return stats(characterStringGroupedFlowable).publish(stringFlowable ->
stringFlowable.concatMap(s ->
Flowable.just(s)
.delay(100, TimeUnit.MILLISECONDS)));
})
.subscribeOn(Schedulers.from(Executors.newFixedThreadPool(1)))
.test()
.await();
Output:
pool-1-thread-1 - group: a - stats - [aa] - count 1
pool-1-thread-1 - group: b - stats - [ba] - count 1
pool-1-thread-1 - group: a - stats - [ab] - count 2
pool-1-thread-1 - group: b - stats - [bb] - count 2
pool-1-thread-1 - end emit
pool-1-thread-1 - group: a - stats - complete - count 2
pool-1-thread-1 - group: b - stats - complete - count 2
RxComputationThreadPool-1 - group: a - stats - [ac] - count 3
Single
to a POST api returning a Completable
. I am not able to achieve that. I posted a SO question but no help till now. Please suggest how to do this. https://stackoverflow.com/questions/53633071/how-to-pass-output-of-a-single-to-a-completable-in-rxjava
Completable.fromSingle(...single<completable> here...)
then it the completable call is ignored.
singleAPi.flatmapCompletable(result -> completableAPI(result))
, I am able to get an object of type Completable
but the inner call happens on the main thread even if i do subscribeOn(Schedulers.io())
.
Single<Completable>
to Completable