RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
TimeUnit.SECONDS
stuff?
Hi
I have problem with RxJavaPlugins.onScheduleHandler
.
At the moment of changing thread pool is invoked
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
my Runnable is wrapped into DisposeTask so I can't check type of action inside scheduleHandler because DisposeTask is package scope.
Do you have any idea how to solve this problem without reflection? :)
Have anybody resolved auto removal of the completed subscriptions from the CompositeSubscription task to release memory from unused stuff? The only thing that comes to my mind is something like this:
public static <T> void subscribeWithAutoRemove(Observable<T> observable, Action1<T> action, CompositeSubscription compositeSubscription) {
MutableObject<Subscription> subscriptionHolder = new MutableObject<>();
subscriptionHolder.set(observable
.doOnCompleted(() -> {
if (subscriptionHolder.isSet()) {
compositeSubscription.remove(subscriptionHolder.get());
}
})
.subscribe(action));
if (!subscriptionHolder.get().isUnsubscribed()) {
compositeSubscription.add(subscriptionHolder.get());
}
}
Maybe there is a better way?
import rx.subjects.PublishSubject;
import org.junit.*;
import rx.*;
import rx.functions.Action1;
import rx.plugins.RxJavaHooks;
import rx.subscriptions.CompositeSubscription;
public class AutoRemoveSubscription {
public static <T> void subscribeAutoRelease(
Observable<T> source,
final Action1<T> onNext,
CompositeSubscription composite) {
Subscriber<T> subscriber = new Subscriber<T>() {
@Override
public void onCompleted() {
composite.remove(this);
}
@Override
public void onError(Throwable e) {
composite.remove(this);
RxJavaHooks.onError(e);
}
@Override
public void onNext(T t) {
try {
onNext.call(t);
} catch (Throwable ex) {
unsubscribe();
onError(ex);
}
}
};
composite.add(subscriber);
source.subscribe(subscriber);
}
@Test
public void test() {
CompositeSubscription composite = new CompositeSubscription();
PublishSubject<Integer> ps = PublishSubject.create();
subscribeAutoRelease(ps, System.out::println, composite);
Assert.assertTrue(composite.hasSubscriptions());
ps.onNext(1);
ps.onCompleted();
Assert.assertFalse(composite.hasSubscriptions());
}
}
subscriber.add(new Subscription() {
boolean unsubscribed = false;
@Override public void unsubscribe() {
unsubscribed = true;
composite.remove(subscriber);
}
@Override public boolean isUnsubscribed() {
return unsubscribed;
}
});
public static <T> Subscription subscribeAutoRelease(
Observable<T> source,
final Action1<T> onNext,
CompositeSubscription composite) {
Subscriber<T> subscriber = new Subscriber<T>() {
@Override
public void onCompleted() {
composite.remove(this);
}
@Override
public void onError(Throwable e) {
// simulate same behaviour as in the method {@link Observable#subscribe(Action1)}
InternalObservableUtils.ERROR_NOT_IMPLEMENTED.call(e);
}
@Override
public void onNext(T t) {
try {
onNext.call(t);
} catch (Throwable ex) {
unsubscribe();
onError(ex);
}
}
};
subscriber.add(new Subscription() {
boolean unsubscribed = false;
@Override public void unsubscribe() {
unsubscribed = true;
composite.remove(subscriber);
}
@Override public boolean isUnsubscribed() {
return unsubscribed;
}
});
composite.add(subscriber);
return source.subscribe(subscriber);
}
@Test
public void testSubscribeAutoRelease() {
CompositeSubscription composite = new CompositeSubscription();
PublishSubject<Integer> ps = PublishSubject.create();
//
Subscription subscription = RxUtils.subscribeAutoRelease(ps, System.out::println, composite);
Assert.assertTrue(composite.hasSubscriptions());
ps.onNext(1);
subscription.unsubscribe();
Assert.assertFalse(composite.hasSubscriptions());
//
RxUtils.subscribeAutoRelease(ps, System.out::println, composite);
Assert.assertTrue(composite.hasSubscriptions());
ps.onNext(1);
ps.onCompleted();
Assert.assertFalse(composite.hasSubscriptions());
}
@Override
public int requestFusion(int mode) {
if ((mode & SYNC) != 0) {
fusionMode = true;
return SYNC;
}
return NONE;
}
FromArrayDisposable
will be used as a Queue all pull
will be called in the same thread as FromArrayDisposable
was created, so there is no Threadsafity required.
run
, which looks like next:
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
actual.onError(new NullPointerException("The " + i + "th element is null"));
return;
}
actual.onNext(value);
}
if (!isDisposed()) {
actual.onComplete();
}
}
@Override
public void subscribeActual(Observer<? super T> s) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
s.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
FromArrayDisposable
is created in the same thread as the run
method is being called
Observable
s, only a certain level of thread safety is usually required. The Disposable
you hand to the Observer.onSubscribe
must be thread safe and when you call Observer.onNext
, onError
and onComplete
, that has to happen in a sequential manner (no concurrent invocations). FromArrayDisposable
's only thread safety comes from the volatile boolean disposed
which may be set from dispose()
calls coming from any thread. Since there is no backpressure (and ignoring fusion), the emission of the array items happens on the thread which called the operator's subscribe
method and doesn't leave it until all items have been emitted or the disposed
flag was found to be true
. Therefore, no additional thread-safety measures are required. When fusion is enabled, the caller to the pull
must ensure sequential call to pull
, isEmpty
and clear()
.