These are chat archives for ReactiveX/RxJava

23rd
Nov 2017
Eugene Popovich
@httpdispatch
Nov 23 2017 06:38

Have anybody resolved auto removal of the completed subscriptions from the CompositeSubscription task to release memory from unused stuff? The only thing that comes to my mind is something like this:

    public static <T> void subscribeWithAutoRemove(Observable<T> observable, Action1<T> action, CompositeSubscription compositeSubscription) {
        MutableObject<Subscription> subscriptionHolder = new MutableObject<>();
        subscriptionHolder.set(observable
                .doOnCompleted(() -> {
                    if (subscriptionHolder.isSet()) {
                        compositeSubscription.remove(subscriptionHolder.get());
                    }
                })
                .subscribe(action));
        if (!subscriptionHolder.get().isUnsubscribed()) {
            compositeSubscription.add(subscriptionHolder.get());
        }
    }

Maybe there is a better way?

David Karnok
@akarnokd
Nov 23 2017 08:43
A more non-hacky solution is to create a Subscriber upfront and implement its onCompleted and onError to remove itself from the composite:
import rx.subjects.PublishSubject;

import org.junit.*;

import rx.*;
import rx.functions.Action1;
import rx.plugins.RxJavaHooks;
import rx.subscriptions.CompositeSubscription;

public class AutoRemoveSubscription {

    public static <T> void subscribeAutoRelease(
            Observable<T> source, 
            final Action1<T> onNext, 
            CompositeSubscription composite) {
        Subscriber<T> subscriber = new Subscriber<T>() {
            @Override
            public void onCompleted() {
                composite.remove(this);
            }
            @Override
            public void onError(Throwable e) {
                composite.remove(this);
                RxJavaHooks.onError(e);
            }
            @Override
            public void onNext(T t) {
                try {
                    onNext.call(t);
                } catch (Throwable ex) {
                    unsubscribe();
                    onError(ex);
                }
            }
        };
        composite.add(subscriber);
        source.subscribe(subscriber);
    }

    @Test
    public void test() {
        CompositeSubscription composite = new CompositeSubscription();

        PublishSubject<Integer> ps = PublishSubject.create();

        subscribeAutoRelease(ps, System.out::println, composite);

        Assert.assertTrue(composite.hasSubscriptions());

        ps.onNext(1);
        ps.onCompleted();

        Assert.assertFalse(composite.hasSubscriptions());
    }
}
Eugene Popovich
@httpdispatch
Nov 23 2017 08:54
@akarnokd thanks, that looks much better
Eugene Popovich
@httpdispatch
Nov 23 2017 09:11
@akarnokd is it possible to extend your solution to auto release if unsubscribe is called instead of onCompleted?
or in addition to onCompleted
Is it correct?
subscriber.add(new Subscription() {
            boolean unsubscribed = false;

            @Override public void unsubscribe() {
                unsubscribed = true;
                composite.remove(subscriber);
            }

            @Override public boolean isUnsubscribed() {
                return unsubscribed;
            }
        });
Eugene Popovich
@httpdispatch
Nov 23 2017 09:34
Looks like that works. Final solution if anybody need it. Thanks @akarnokd for great help
    public static <T> Subscription subscribeAutoRelease(
            Observable<T> source,
            final Action1<T> onNext,
            CompositeSubscription composite) {
        Subscriber<T> subscriber = new Subscriber<T>() {
            @Override
            public void onCompleted() {
                composite.remove(this);
            }

            @Override
            public void onError(Throwable e) {
                // simulate same behaviour as in the method {@link Observable#subscribe(Action1)}
                InternalObservableUtils.ERROR_NOT_IMPLEMENTED.call(e);
            }

            @Override
            public void onNext(T t) {
                try {
                    onNext.call(t);
                } catch (Throwable ex) {
                    unsubscribe();
                    onError(ex);
                }
            }
        };
        subscriber.add(new Subscription() {
            boolean unsubscribed = false;

            @Override public void unsubscribe() {
                unsubscribed = true;
                composite.remove(subscriber);
            }

            @Override public boolean isUnsubscribed() {
                return unsubscribed;
            }
        });
        composite.add(subscriber);
        return source.subscribe(subscriber);
    }

@Test
    public void testSubscribeAutoRelease() {
        CompositeSubscription composite = new CompositeSubscription();

        PublishSubject<Integer> ps = PublishSubject.create();


        //

        Subscription subscription = RxUtils.subscribeAutoRelease(ps, System.out::println, composite);

        Assert.assertTrue(composite.hasSubscriptions());

        ps.onNext(1);

        subscription.unsubscribe();

        Assert.assertFalse(composite.hasSubscriptions());

        //

        RxUtils.subscribeAutoRelease(ps, System.out::println, composite);

        Assert.assertTrue(composite.hasSubscriptions());

        ps.onNext(1);
        ps.onCompleted();

        Assert.assertFalse(composite.hasSubscriptions());
    }