These are chat archives for ReactiveX/RxJava

28th
Nov 2018
lukaszguz
@lukaszguz
Nov 28 2018 10:22

Hi,
I'm a bit confused about how doOnComplete works.

Flowable<String> emitter = Flowable.just("aa", "ba", "ab", "bb", "ac")
                .doOnComplete(() -> println(Thread.currentThread().getName() + " - end emit"));

        Flowable<GroupedFlowable<Character, String>> groupBy = emitter.groupBy(s -> s.charAt(0));

        groupBy.flatMap(characterStringGroupedFlowable -> bufferAndDelay(stats(characterStringGroupedFlowable)))
                .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(1)))
                .test()
                .await();

private Flowable<String> stats(GroupedFlowable<Character, String> groupedFlowable) {
        return groupedFlowable.compose(upstream -> {
            AtomicLong count = new AtomicLong(0);
            return upstream
                    .doOnNext(s -> {
                        count.incrementAndGet();
                        println(Thread.currentThread().getName() + " - group: " + groupedFlowable.getKey() + " - stats - [" + s + "] - count " + count.get());
                    })
                    .doOnComplete(() -> println(Thread.currentThread().getName() + " - group: " + groupedFlowable.getKey() + " - stats - complete - count " + count.get()));
        });
    }

    private Flowable<String> bufferAndDelay(Flowable<String> flowable) {
        return flowable.publish(stringFlowable ->
                stringFlowable.buffer(1)
                        .concatMap(bufferedStrings ->
                                Flowable.just(bufferedStrings)
                                        .doOnNext(x -> println(Thread.currentThread().getName() + " - before delay - " + x))
                                        .delay(100, TimeUnit.MILLISECONDS, Schedulers.computation())
                                        .doOnNext(x -> println(Thread.currentThread().getName() + " - after delay - " + x))
                                        .flatMap(Flowable::fromIterable)
                        ));
    }

Output:

pool-1-thread-1 - group: a - stats - [aa] - count 1
pool-1-thread-1 - before delay - [aa]
pool-1-thread-1 - group: b - stats - [ba] - count 1
pool-1-thread-1 - before delay - [ba]
pool-1-thread-1 - group: a - stats - [ab] - count 2
pool-1-thread-1 - group: b - stats - [bb] - count 2
pool-1-thread-1 - end emit
pool-1-thread-1 - group: a - stats - complete - count 2
pool-1-thread-1 - group: b - stats - complete - count 2
RxComputationThreadPool-1 - after delay - [aa]
RxComputationThreadPool-2 - after delay - [ba]
RxComputationThreadPool-1 - group: a - stats - [ac] - count 3
RxComputationThreadPool-1 - before delay - [ab]
RxComputationThreadPool-2 - before delay - [bb]
RxComputationThreadPool-4 - after delay - [bb]
RxComputationThreadPool-3 - after delay - [ab]
RxComputationThreadPool-3 - before delay - [ac]
RxComputationThreadPool-5 - after delay - [ac]

Result is that

pool-1-thread-1 - group: a - stats - complete - count 2
pool-1-thread-1 - group: b - stats - complete - count 2
...
RxComputationThreadPool-1 - group: a - stats - [ac] - count 3

Why stats - complete is before stats - [ac] - count 3? Am I break rule saying "emit event synchronously, not concurrently"
When I remove publish from bufferAndDelay method order is correct.

lukaszguz
@lukaszguz
Nov 28 2018 12:13

Shorter version:

Flowable<String> emitter = Flowable.just("aa", "ba", "ab", "bb", "ac")
                .doOnComplete(() -> println(Thread.currentThread().getName() + " - end emit"));

        Flowable<GroupedFlowable<Character, String>> groupBy = emitter.groupBy(s -> s.charAt(0));

        groupBy.flatMap(characterStringGroupedFlowable -> {
            return stats(characterStringGroupedFlowable).publish(stringFlowable ->
                    stringFlowable.concatMap(s ->
                            Flowable.just(s)
                                    .delay(100, TimeUnit.MILLISECONDS)));
        })
                .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(1)))
                .test()
                .await();

Output:

pool-1-thread-1 - group: a - stats - [aa] - count 1
pool-1-thread-1 - group: b - stats - [ba] - count 1
pool-1-thread-1 - group: a - stats - [ab] - count 2
pool-1-thread-1 - group: b - stats - [bb] - count 2
pool-1-thread-1 - end emit
pool-1-thread-1 - group: a - stats - complete - count 2
pool-1-thread-1 - group: b - stats - complete - count 2
RxComputationThreadPool-1 - group: a - stats - [ac] - count 3
Dan O'Reilly
@dano
Nov 28 2018 15:44
Once a disposable has been added to CompositeDisposable, am I correct in thinking that the CompositeDisposable will keep a reference to it until it's explicitly removed? Meaning, if the source of a given Disposable that has been added to the CompositeDisposable terminates, the Disposable can't get garbage collected as long as the CompositeDisposable keeps the reference to it?
Dan O'Reilly
@dano
Nov 28 2018 16:42
looks like subscribeAutoDispose from RxJava2Extensions will make sure the reference gets removed
i think that's what i want