RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
toSingle
and toObservable
in a bunch of places. Somehow errors weren't propagating correctly and RxJavaHooks.onError was called (unexpectedly). I removed the toSingle
and toObservable
calls and now the errors are propagating. It seems like this is a bug, but in small test cases I haven't been able to reproduce this. Is there some way to 'extract' the full observable from a large application, so that it can be recreated inside a small test? Or rather, what is the best approach to reproduce/share this problem?
SafeSubscriber.done
was true when the error occured, which is why it called RxJavaHooks.onError
. doOnError
was called before toSingle
, but not after toSingle
. The observable that was converted to single did not emit any onNext, just the error.
doOnComplete
, doOnNext
, etc
Subscriber
/Observer
or just with subscribe()
or subscribe(Action1)
?
SafeSubscriber
is should not receive two onError
s or an onError
after an onCompleted
. Have you been using Observable.create()
for a chance?
at rx.plugins.RxJavaHooks.onError(RxJavaHooks.java:299)
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:150)
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:115)
at rx.Single$1$1.onError(Single.java:85)
at rx.internal.operators.OnSubscribeSingle$1.onError(OnSubscribeSingle.java:64)
at rx.exceptions.Exceptions.throwOrReport(Exceptions.java:205)
at rx.internal.operators.OnSubscribeFromCallable.call(OnSubscribeFromCallable.java:50)
at rx.internal.operators.OnSubscribeFromCallable.call(OnSubscribeFromCallable.java:33)
at rx.Observable.unsafeSubscribe(Observable.java:10150)
at rx.internal.operators.OnSubscribeSingle.call(OnSubscribeSingle.java:81)
at rx.internal.operators.OnSubscribeSingle.call(OnSubscribeSingle.java:27)
at rx.Single$1.call(Single.java:90)
at rx.Single$1.call(Single.java:70)
at rx.Single.subscribe(Single.java:1839)
at rx.Single.subscribe(Single.java:1916)
at rx.Single$19$1.call(Single.java:1971)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at rx.internal.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.run(ExecutorScheduler.java:107)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588)
at java.lang.Thread.run(Thread.java:818)
Observable.create
s in my code
Observable.fromAsync
once I knew about the behaviour of Observable.create
.toSingle()
.subscribeOn(scheduler)
.toObservable()
.toSingle()
Schedulers.from(Executors.newSingleThreadExecutor())
at rx.plugins.RxJavaHooks.onError(RxJavaHooks.java:297)
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:150)
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:115)
at rx.Single$1$1.onError(Single.java:84)
at rx.internal.operators.OnSubscribeSingle$1.onError(OnSubscribeSingle.java:64)
at rx.exceptions.Exceptions.throwOrReport(Exceptions.java:216)
at rx.internal.operators.OnSubscribeFromCallable.call(OnSubscribeFromCallable.java:50)
at rx.internal.operators.OnSubscribeFromCallable.call(OnSubscribeFromCallable.java:33)
at rx.Observable.unsafeSubscribe(Observable.java:10151)
at rx.internal.operators.OnSubscribeSingle.call(OnSubscribeSingle.java:81)
at rx.internal.operators.OnSubscribeSingle.call(OnSubscribeSingle.java:27)
at rx.Single$1.call(Single.java:89)
at rx.Single$1.call(Single.java:69)
at rx.Single.subscribe(Single.java:1824)
at rx.Single.subscribe(Single.java:1901)
at rx.Single$19$1.call(Single.java:1956)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at rx.internal.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.run(ExecutorScheduler.java:107)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588)
at java.lang.Thread.run(Thread.java:818)
Observable.fromCallable
. It's not really about the exception itself:
I'm throwing the exception from Observable.fromCallable and later down the chain I'm trying to catch it using onErrorResumeNext
(it's a long chain, i'm trying to reproduce it now). This works without problems when everything stays Observable, but when I add:
.toSingle()
.subscribeOn(scheduler)
.toObservable()
.toSingle()
then the exception is not caught using onErrorResumeNext
, but instead it is passed to RxJavaHooks.onError
Schedulers.immediate()
?
AndroidSchedulers.mainThread()
amongst others
package rx;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import rx.observers.TestSubscriber;
import rx.plugins.RxJavaHooks;
import rx.schedulers.Schedulers;
public class ProblemTestCase {
Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor());
@After
public void after() {
RxJavaHooks.reset();
}
private Observable<String> createProblematicObservable() {
return Observable.<String>fromCallable(() -> {
throw new IllegalStateException();
})
.toSingle()
.subscribeOn(scheduler)
.toObservable()
.onErrorResumeNext(caughtError -> {
return Observable.just("OK");
});
}
@Test
public void testA() {
TestSubscriber<Object> ts = new TestSubscriber<>();
createProblematicObservable().subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();
ts.assertValue("OK");
ts.assertCompleted();
}
@Test
public void testB() {
AtomicBoolean isRxJavaHooksSetOnErrorCalled = new AtomicBoolean(false);
RxJavaHooks.setOnError(throwable -> {
isRxJavaHooksSetOnErrorCalled.set(true);
});
TestSubscriber<Object> ts = new TestSubscriber<>();
createProblematicObservable().subscribe(ts);
ts.awaitTerminalEvent();
// We assert that RxJavaHooks.onError was *not* called, because Observable.onErrorResumeNext
// should have been called.
Assert.assertFalse(isRxJavaHooksSetOnErrorCalled.get());
ts.assertNoErrors();
ts.assertValue("OK");
ts.assertCompleted();
}
}
RxJavaHooks.onError
is called for some reason
ok I think I see what's going on, but... that's very confusing. it's hard to wrap my head around:
.toSingle()
.subscribeOn(scheduler)
.toObservable()
would seem functionally the same as:
.subscribeOn(scheduler)
except for RxJavaHooks.onError being called for the first case, but not for the second
subscribe
call instead of unsafeSubscribe
, the SafeSubscriber
wrapper is appended.
yes, but hmm, this behaves differently:
Single.<String>fromCallable(() ->
throw new IllegalStateException();
})
.subscribeOn(scheduler)
.toObservable()
.onErrorResumeNext(throwable -> Observable.empty())
.subscribe(ts)
than this:
Observable.<String>fromCallable(() ->
throw new IllegalStateException();
})
.subscribeOn(scheduler)
.onErrorResumeNext(throwable -> Observable.empty())
.subscribe(ts)
onError
when .subscribe(...) catches an error, but not if toObservable
catches on. Now toObservable
also catches errors because it creates a SafeSubscriber
(am I understanding this correctly?)
subscribe()
call.
SafeSubscriber
beyond the final subscribe
call and I don't get an IllegalStateException printed
TestSubscriber
handle errors already?
Observable.error(new TestException())
.subscribe(new TestSubscriber<Object>());
doesn't suppress the error right? In that case RxJavaHooks.onError should be called I presume?
RxJavaHooks.onError
though
RxJavaHooks.onError
.
.subscribe
couldn't deliver the exception anywhere
RxJavaPlugins
is still required?
RxJavaHooks
for this?
RxJavaHooks.setOnError
due to RxJavaPlugins
being deprecated (see https://github.com/ReactiveX/RxJava/issues/4566#issuecomment-248253935), I'm guessing others will do the same and fall for the same problems.
SafeSubscriber._onError
was more like a bug that's with us from the very early on. I would have dropped the call entirely but we don't know if code is actually relying on getting all exceptions.