These are chat archives for ReactiveX/RxJava

29th
Sep 2016
Bob van der Linden
@bobvanderlinden
Sep 29 2016 13:25 UTC
I've got an application where I was using toSingle and toObservable in a bunch of places. Somehow errors weren't propagating correctly and RxJavaHooks.onError was called (unexpectedly). I removed the toSingle and toObservable calls and now the errors are propagating. It seems like this is a bug, but in small test cases I haven't been able to reproduce this. Is there some way to 'extract' the full observable from a large application, so that it can be recreated inside a small test? Or rather, what is the best approach to reproduce/share this problem?
Bob van der Linden
@bobvanderlinden
Sep 29 2016 13:32 UTC
just to give some more details about the problem: SafeSubscriber.done was true when the error occured, which is why it called RxJavaHooks.onError. doOnError was called before toSingle, but not after toSingle. The observable that was converted to single did not emit any onNext, just the error.
oh, almost forgot, it also did not emit any Complete. This was checked using doOnComplete, doOnNext, etc
David Karnok
@akarnokd
Sep 29 2016 18:28 UTC
@bobvanderlinden Did you subscribe with a full Subscriber/Observer or just with subscribe() or subscribe(Action1)?
Bob van der Linden
@bobvanderlinden
Sep 29 2016 18:29 UTC
it's subscribing to a subject
BehaviorSubject
David Karnok
@akarnokd
Sep 29 2016 18:31 UTC
What was the hook's stacktrace?
Can you give some hints about what the processing chain contained and why you had to go toSingle and back toObservable?
Bob van der Linden
@bobvanderlinden
Sep 29 2016 18:32 UTC
one second, i'll try to reproduce the stacktrace agian
the toSingle and toObservable were essentially not needed, but I had some functions already written for Observable, whereas another interface exposed its results as Single. That's why I called toObservable and toSingle.
I was mostly wondering whether there is some way to log/track/display the chain of observables, so that I could more easily reproduce the chain in a smaller test
David Karnok
@akarnokd
Sep 29 2016 18:44 UTC
In 1.x there is no reliable link between subsequent operators that go upstream. In theory, you could write custom Observables that hook into a chain via RxJavaHooks.onCreate and let's you capture information.
Bob van der Linden
@bobvanderlinden
Sep 29 2016 18:44 UTC
Hmm, that sounds like an interesting approach indeed
David Karnok
@akarnokd
Sep 29 2016 18:45 UTC
Reactor 3 does introspection and they can visualize an established flow graphically but that needs drastic changes in each operator.
SafeSubscriber is should not receive two onErrors or an onError after an onCompleted. Have you been using Observable.create() for a chance?
Bob van der Linden
@bobvanderlinden
Sep 29 2016 18:48 UTC
I've gone through my code in search of Observable.create before, I'll do that again
Bob van der Linden
@bobvanderlinden
Sep 29 2016 18:54 UTC
The stack trace from RxJavaHooks.onError is:
    at rx.plugins.RxJavaHooks.onError(RxJavaHooks.java:299)
    at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:150)
    at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:115)
    at rx.Single$1$1.onError(Single.java:85)
    at rx.internal.operators.OnSubscribeSingle$1.onError(OnSubscribeSingle.java:64)
    at rx.exceptions.Exceptions.throwOrReport(Exceptions.java:205)
    at rx.internal.operators.OnSubscribeFromCallable.call(OnSubscribeFromCallable.java:50)
    at rx.internal.operators.OnSubscribeFromCallable.call(OnSubscribeFromCallable.java:33)
    at rx.Observable.unsafeSubscribe(Observable.java:10150)
    at rx.internal.operators.OnSubscribeSingle.call(OnSubscribeSingle.java:81)
    at rx.internal.operators.OnSubscribeSingle.call(OnSubscribeSingle.java:27)
    at rx.Single$1.call(Single.java:90)
    at rx.Single$1.call(Single.java:70)
    at rx.Single.subscribe(Single.java:1839)
    at rx.Single.subscribe(Single.java:1916)
    at rx.Single$19$1.call(Single.java:1971)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    at rx.internal.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.run(ExecutorScheduler.java:107)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588)
    at java.lang.Thread.run(Thread.java:818)
I have not found any Observable.creates in my code
converted everything to Observable.fromAsync once I knew about the behaviour of Observable.create
also
The following broke it:
        .toSingle()
        .subscribeOn(scheduler)
        .toObservable()
        .toSingle()
scheduler is Schedulers.from(Executors.newSingleThreadExecutor())
I'll try to create a small test case that reproduces the same error again
Bob van der Linden
@bobvanderlinden
Sep 29 2016 19:09 UTC
ah, i see i'm also on 1.1.9, not 1.2.0
I'll upgrade first
David Karnok
@akarnokd
Sep 29 2016 19:19 UTC
That's odd. Judging from RxJavaHooks.java:299 it seems the error handler plugin crashed.
I'm upgrading my codebase now to see whether the problem still occurs
still occurs on 1.2.0. Stack trace:
at rx.plugins.RxJavaHooks.onError(RxJavaHooks.java:297)
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:150)
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:115)
at rx.Single$1$1.onError(Single.java:84)
at rx.internal.operators.OnSubscribeSingle$1.onError(OnSubscribeSingle.java:64)
at rx.exceptions.Exceptions.throwOrReport(Exceptions.java:216)
at rx.internal.operators.OnSubscribeFromCallable.call(OnSubscribeFromCallable.java:50)
at rx.internal.operators.OnSubscribeFromCallable.call(OnSubscribeFromCallable.java:33)
at rx.Observable.unsafeSubscribe(Observable.java:10151)
at rx.internal.operators.OnSubscribeSingle.call(OnSubscribeSingle.java:81)
at rx.internal.operators.OnSubscribeSingle.call(OnSubscribeSingle.java:27)
at rx.Single$1.call(Single.java:89)
at rx.Single$1.call(Single.java:69)
at rx.Single.subscribe(Single.java:1824)
at rx.Single.subscribe(Single.java:1901)
at rx.Single$19$1.call(Single.java:1956)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at rx.internal.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.run(ExecutorScheduler.java:107)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588)
at java.lang.Thread.run(Thread.java:818)
David Karnok
@akarnokd
Sep 29 2016 19:26 UTC
Yeah, its the hook crashing. Can you share what the error message actually says?
Bob van der Linden
@bobvanderlinden
Sep 29 2016 19:28 UTC
RootNotAvailableException, but it's my own one that I'm throwing inside Observable.fromCallable. It's not really about the exception itself:

I'm throwing the exception from Observable.fromCallable and later down the chain I'm trying to catch it using onErrorResumeNext (it's a long chain, i'm trying to reproduce it now). This works without problems when everything stays Observable, but when I add:

        .toSingle()
        .subscribeOn(scheduler)
        .toObservable()
        .toSingle()

then the exception is not caught using onErrorResumeNext, but instead it is passed to RxJavaHooks.onError

sorry, if it's still unclear. The observable chain is somewhat long. I'll try to reproduce it in a smallish test. Otherwise I feel bad taking your time with this somewhat vague description
David Karnok
@akarnokd
Sep 29 2016 19:34 UTC
Does the problem still happen if you use Schedulers.immediate()?
Bob van der Linden
@bobvanderlinden
Sep 29 2016 19:36 UTC
hmm, yes it does. later down the chain .subscribeOn is called with AndroidSchedulers.mainThread() amongst others
Bob van der Linden
@bobvanderlinden
Sep 29 2016 20:06 UTC
package rx;

import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

import rx.observers.TestSubscriber;
import rx.plugins.RxJavaHooks;
import rx.schedulers.Schedulers;

public class ProblemTestCase {
    Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor());

    @After
    public void after() {
        RxJavaHooks.reset();
    }

    private Observable<String> createProblematicObservable() {
        return Observable.<String>fromCallable(() -> {
                throw new IllegalStateException();
            })
            .toSingle()
            .subscribeOn(scheduler)
            .toObservable()
            .onErrorResumeNext(caughtError -> {
                return Observable.just("OK");
            });
    }

    @Test
    public void testA() {
        TestSubscriber<Object> ts = new TestSubscriber<>();

        createProblematicObservable().subscribe(ts);

        ts.awaitTerminalEvent();

        ts.assertNoErrors();
        ts.assertValue("OK");
        ts.assertCompleted();
    }

    @Test
    public void testB() {
        AtomicBoolean isRxJavaHooksSetOnErrorCalled = new AtomicBoolean(false);
        RxJavaHooks.setOnError(throwable -> {
            isRxJavaHooksSetOnErrorCalled.set(true);
        });
        TestSubscriber<Object> ts = new TestSubscriber<>();

        createProblematicObservable().subscribe(ts);

        ts.awaitTerminalEvent();

        // We assert that RxJavaHooks.onError was *not* called, because Observable.onErrorResumeNext
        // should have been called.
        Assert.assertFalse(isRxJavaHooksSetOnErrorCalled.get());

        ts.assertNoErrors();
        ts.assertValue("OK");
        ts.assertCompleted();
    }
}
testA passes, like I expect. testB does not. For testB RxJavaHooks.onError is called for some reason
note that the TestSubscriber does not contain any errors for testB (nor testA)
David Karnok
@akarnokd
Sep 29 2016 20:08 UTC
ReactiveX/RxJava#4332
Bob van der Linden
@bobvanderlinden
Sep 29 2016 20:09 UTC
aaah
David Karnok
@akarnokd
Sep 29 2016 20:10 UTC
SafeSubscriber always called the original hook.
Bob van der Linden
@bobvanderlinden
Sep 29 2016 20:11 UTC
hmm
David Karnok
@akarnokd
Sep 29 2016 20:14 UTC
The PR never appeared, I guess I have to do it myself now. The original hook was no-op.
Bob van der Linden
@bobvanderlinden
Sep 29 2016 20:14 UTC

ok I think I see what's going on, but... that's very confusing. it's hard to wrap my head around:

            .toSingle()
            .subscribeOn(scheduler)
            .toObservable()

would seem functionally the same as:

            .subscribeOn(scheduler)

except for RxJavaHooks.onError being called for the first case, but not for the second

I can give it a go if you want
but I'm not sure just another hook would be sufficient
David Karnok
@akarnokd
Sep 29 2016 20:15 UTC
if there is a plain subscribe call instead of unsafeSubscribe, the SafeSubscriber wrapper is appended.
Bob van der Linden
@bobvanderlinden
Sep 29 2016 20:17 UTC

yes, but hmm, this behaves differently:

Single.<String>fromCallable(() ->
                throw new IllegalStateException();
            })
            .subscribeOn(scheduler)
            .toObservable()
            .onErrorResumeNext(throwable -> Observable.empty())
            .subscribe(ts)

than this:

Observable.<String>fromCallable(() ->
                throw new IllegalStateException();
            })
            .subscribeOn(scheduler)
            .onErrorResumeNext(throwable -> Observable.empty())
            .subscribe(ts)
^-- edited to ignore errors
i'd expect an onError when .subscribe(...) catches an error, but not if toObservable catches on. Now toObservable also catches errors because it creates a SafeSubscriber (am I understanding this correctly?)
David Karnok
@akarnokd
Sep 29 2016 20:23 UTC
Oddly, Single.toObservable doesn't do much but to wrap Single.onSubscribe into an Observable (they use the same actual subscribe action type). Let me see if there is more than one plain subscribe() call.
David Karnok
@akarnokd
Sep 29 2016 20:29 UTC
ReactiveX/RxJava#4641
Bob van der Linden
@bobvanderlinden
Sep 29 2016 20:31 UTC
Wow, that was quick! Thanks a lot!
David Karnok
@akarnokd
Sep 29 2016 20:31 UTC
There is no excess call with any of your last examples to SafeSubscriber beyond the final subscribe call and I don't get an IllegalStateException printed
Bob van der Linden
@bobvanderlinden
Sep 29 2016 20:33 UTC
hmm, not sure about the testcase though. Does TestSubscriber handle errors already?
David Karnok
@akarnokd
Sep 29 2016 20:35 UTC
It consumes them for later validation but with onErrorResumeNext, there is no error.
Bob van der Linden
@bobvanderlinden
Sep 29 2016 20:38 UTC
Observable.error(new TestException())
.subscribe(new TestSubscriber<Object>());
doesn't suppress the error right? In that case RxJavaHooks.onError should be called I presume?
I might be misunderstanding the meaning of RxJavaHooks.onError though
David Karnok
@akarnokd
Sep 29 2016 20:38 UTC
Yes, it calls onError no matter that TestSubscriber does after that
RxJavaHooks.onError is supposed to be called when an exception can't be properly delivered to an Observer due to the Observable protocol
for example, you plain merge two Observables and both error one after another. In this case, the first one gets delivered and the second one goes into the RxJavaHooks.
Bob van der Linden
@bobvanderlinden
Sep 29 2016 20:40 UTC
oohw, that's good to know as well
David Karnok
@akarnokd
Sep 29 2016 20:40 UTC
It's like a last line of defense against loosing important errors.
Bob van der Linden
@bobvanderlinden
Sep 29 2016 20:40 UTC
okok
ok, so if Rx can deliver the exception to the subscriber, then it is not delivered to RxJavaHooks.onError.
my 'instinctive' first reaction to that testcase of https://github.com/ReactiveX/RxJava/pull/4641/files#diff-06343da0b75afa1cd1a7239dc7680703R1087 is that it's incorrect
but if the definition of RxJavaHooks.onError is that it is only called for exceptions that are not send to the subscriber, then it seems good. It does need very careful description though. At the moment it seems like a catch-all for when .subscribe couldn't deliver the exception anywhere
I guess for that purpose RxJavaPlugins is still required?
Bob van der Linden
@bobvanderlinden
Sep 29 2016 20:47 UTC
Do we need a separate hook in RxJavaHooks for this?
I moved to RxJavaHooks.setOnError due to RxJavaPlugins being deprecated (see https://github.com/ReactiveX/RxJava/issues/4566#issuecomment-248253935), I'm guessing others will do the same and fall for the same problems.
David Karnok
@akarnokd
Sep 29 2016 20:51 UTC
RxJavaPlugins is (unfortunately) an established way of testing and we can't drop it in the 1.x train.
That SafeSubscriber._onError was more like a bug that's with us from the very early on. I would have dropped the call entirely but we don't know if code is actually relying on getting all exceptions.
Bob van der Linden
@bobvanderlinden
Sep 29 2016 20:55 UTC
ah k, that makes sense
ok, i'll look forward to rxjava 2 then and replace RxJavaHooks.onErrror with RxJavaPlugins again.
https://github.com/ReactiveX/RxJava/wiki/Plugins does need an update for onError though. It isn't the catch-all error handler in this instance
David Karnok
@akarnokd
Sep 29 2016 21:04 UTC
Sure. Updated the wiki.
Bob van der Linden
@bobvanderlinden
Sep 29 2016 21:05 UTC
ah, that example also makes things a bit more clear for me
thanks again!