These are chat archives for ReactiveX/RxJava

6th
Oct 2016
Eugene Popovich
@httpdispatch
Oct 06 2016 07:48

Is that OK that subscriber may receive items even after unsubscription?

@Test public void testUnsubscribe() throws InterruptedException {
    PublishSubject<Boolean> mBooleanPublishSubject = PublishSubject.create();
    Subscription subscription = mBooleanPublishSubject
            .doOnNext(value -> System.out.println("Sending value " + value))
            .onBackpressureBuffer()
            .observeOn(Schedulers.computation())
            .map(v -> {
                long sum = 0;
                for (int i = 0; i < 10000000; i++) {
                    sum += i;
                }
                return sum > 0;
            })
            .doOnNext(__ -> System.out.println("Before subscribe"))
            .subscribe(value -> System.out.println("Value received " + value));

    mBooleanPublishSubject.onNext(true);
    mBooleanPublishSubject.onNext(true);
    Thread.sleep(10);
    System.out.println("Unsubscribing");
    subscription.unsubscribe();
    System.out.println("Unsubscribed");
    Thread.sleep(1000);
}

Output

Sending value true
Sending value true
Unsubscribing
Unsubscribed
Before subscribe
Value received true
Dorus
@Dorus
Oct 06 2016 08:14
@httpdispatch Just the one that is in progress i guess. Most events will be silently discarded.
Eugene Popovich
@httpdispatch
Oct 06 2016 08:15
Yes it is in progres. But there are at least 2 calls after the usubscription doOnNext and subscribe which should be ignored
If i replace map with the concatMap everything works as expected
@Test public void testUnsubscribe() throws InterruptedException {
        PublishSubject<Boolean> mBooleanPublishSubject = PublishSubject.create();
        Subscription subscription = mBooleanPublishSubject
                .doOnNext(value -> System.out.println("Sending value " + value))
                .onBackpressureBuffer()
                .observeOn(Schedulers.computation())
                .concatMap(v -> Observable.fromCallable(() -> {
                    System.out.println("concatMap: start");
                    long sum = 0;
                    for (int i = 0; i < 100000000; i++) {
                        sum += i;
                    }
                    System.out.println("concatMap: end");
                    return sum > 0;
                }))
                .doOnNext(__ -> System.out.println("Before subscribe"))
                .subscribe(value -> System.out.println("Value received " + value));

        mBooleanPublishSubject.onNext(true);
        mBooleanPublishSubject.onNext(true);
        Thread.sleep(30);
        System.out.println("Unsubscribing");
        subscription.unsubscribe();
        System.out.println("Unsubscribed");
        Thread.sleep(1000);
    }
Output
Sending value true
Sending value true
concatMap: start
Unsubscribing
Unsubscribed
concatMap: end
Dorus
@Dorus
Oct 06 2016 09:25
I think the specs state unsubscribe should happen on best efforts basis. You can for example also add observeOn before subscribe to swallow it.
@httpdispatch
Eugene Popovich
@httpdispatch
Oct 06 2016 09:43
Using @akarnokd suggestion i've implemented operator to overcome the issue
https://github.com/ReactiveX/RxJava/issues/4673#issuecomment-251914837