RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
TimeUnit.SECONDS
stuff?
Hi
I have problem with RxJavaPlugins.onScheduleHandler
.
At the moment of changing thread pool is invoked
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
my Runnable is wrapped into DisposeTask so I can't check type of action inside scheduleHandler because DisposeTask is package scope.
Do you have any idea how to solve this problem without reflection? :)
Have anybody resolved auto removal of the completed subscriptions from the CompositeSubscription task to release memory from unused stuff? The only thing that comes to my mind is something like this:
public static <T> void subscribeWithAutoRemove(Observable<T> observable, Action1<T> action, CompositeSubscription compositeSubscription) {
MutableObject<Subscription> subscriptionHolder = new MutableObject<>();
subscriptionHolder.set(observable
.doOnCompleted(() -> {
if (subscriptionHolder.isSet()) {
compositeSubscription.remove(subscriptionHolder.get());
}
})
.subscribe(action));
if (!subscriptionHolder.get().isUnsubscribed()) {
compositeSubscription.add(subscriptionHolder.get());
}
}
Maybe there is a better way?
import rx.subjects.PublishSubject;
import org.junit.*;
import rx.*;
import rx.functions.Action1;
import rx.plugins.RxJavaHooks;
import rx.subscriptions.CompositeSubscription;
public class AutoRemoveSubscription {
public static <T> void subscribeAutoRelease(
Observable<T> source,
final Action1<T> onNext,
CompositeSubscription composite) {
Subscriber<T> subscriber = new Subscriber<T>() {
@Override
public void onCompleted() {
composite.remove(this);
}
@Override
public void onError(Throwable e) {
composite.remove(this);
RxJavaHooks.onError(e);
}
@Override
public void onNext(T t) {
try {
onNext.call(t);
} catch (Throwable ex) {
unsubscribe();
onError(ex);
}
}
};
composite.add(subscriber);
source.subscribe(subscriber);
}
@Test
public void test() {
CompositeSubscription composite = new CompositeSubscription();
PublishSubject<Integer> ps = PublishSubject.create();
subscribeAutoRelease(ps, System.out::println, composite);
Assert.assertTrue(composite.hasSubscriptions());
ps.onNext(1);
ps.onCompleted();
Assert.assertFalse(composite.hasSubscriptions());
}
}
subscriber.add(new Subscription() {
boolean unsubscribed = false;
@Override public void unsubscribe() {
unsubscribed = true;
composite.remove(subscriber);
}
@Override public boolean isUnsubscribed() {
return unsubscribed;
}
});
public static <T> Subscription subscribeAutoRelease(
Observable<T> source,
final Action1<T> onNext,
CompositeSubscription composite) {
Subscriber<T> subscriber = new Subscriber<T>() {
@Override
public void onCompleted() {
composite.remove(this);
}
@Override
public void onError(Throwable e) {
// simulate same behaviour as in the method {@link Observable#subscribe(Action1)}
InternalObservableUtils.ERROR_NOT_IMPLEMENTED.call(e);
}
@Override
public void onNext(T t) {
try {
onNext.call(t);
} catch (Throwable ex) {
unsubscribe();
onError(ex);
}
}
};
subscriber.add(new Subscription() {
boolean unsubscribed = false;
@Override public void unsubscribe() {
unsubscribed = true;
composite.remove(subscriber);
}
@Override public boolean isUnsubscribed() {
return unsubscribed;
}
});
composite.add(subscriber);
return source.subscribe(subscriber);
}
@Test
public void testSubscribeAutoRelease() {
CompositeSubscription composite = new CompositeSubscription();
PublishSubject<Integer> ps = PublishSubject.create();
//
Subscription subscription = RxUtils.subscribeAutoRelease(ps, System.out::println, composite);
Assert.assertTrue(composite.hasSubscriptions());
ps.onNext(1);
subscription.unsubscribe();
Assert.assertFalse(composite.hasSubscriptions());
//
RxUtils.subscribeAutoRelease(ps, System.out::println, composite);
Assert.assertTrue(composite.hasSubscriptions());
ps.onNext(1);
ps.onCompleted();
Assert.assertFalse(composite.hasSubscriptions());
}
@Override
public int requestFusion(int mode) {
if ((mode & SYNC) != 0) {
fusionMode = true;
return SYNC;
}
return NONE;
}
FromArrayDisposable
will be used as a Queue all pull
will be called in the same thread as FromArrayDisposable
was created, so there is no Threadsafity required.