These are chat archives for ReactiveX/RxJava

20th
May 2016
David Karnok
@akarnokd
May 20 2016 07:17
This test passes:
        Action0 action = mock(Action0.class);

        TestSubscriber<Integer> ts = new TestSubscriber<Integer>();

        AsyncOnSubscribe.createStateless(new Action2<Long, Observer<Observable<? extends Integer>>>() {
            @Override
            public void call(Long t1, Observer<Observable<? extends Integer>> t2) {
                t2.onCompleted();
            }
        }, action).call(ts);

        verify(action).call();
Ozzy Osborne
@BarDweller
May 20 2016 13:58
I wrote myself a testcase, convinced it wasn't working this morning.. the test case passed (you'll be glad to hear) .. my real code still appeared to fail.. I dug further.. here are the results of the canadian jury.. ;p
Ozzy Osborne
@BarDweller
May 20 2016 14:04
If you build an AsyncOnSubscribe via createStateless and do not pass an unsubscribe hook, but instead wire up the same hook via Observable.create(AsyncOnSubscribe.createStateless(callback)).doOnUnsubscribe(unsubscribeHook) .. then as soon as a user invokes unsubscribe, you get your callback, and you can do whatever you need to unblock / free up the 'callback' function that may be looping calling a blocking fn with a timeout, and testing a 'do we still need to do this' flag..
But.. if you take that same unsubscribe hook, and instead pass it as part of the createStateless call like.. Observable.create(AsyncOnSubscribe.createStateless(callback,unsubscribeHook)) Then your users unsubscribe invocations will not be passed on to your unsubscribeHook until your blocking call method has returned.. (which if you were relying on the unsubscribe to shut it down.. may happen a lot 'later')
That's what was confusing me so much yesterday, because my callback is polling for messages from a queue, which are infrequent.. so most of my unsubscribe manual experiments never saw unsubscribe called, because the blocking call to the callback never returned.
As an aside, for an Observer like this (I think it's a hot observer, as it always emits events) .. my usage pattern is pretty much to just subscribe/unsubscribe from the observable.. does onCompleted have any meaning in this scenario? the underlying messages never have an 'ending' as such.
David Karnok
@akarnokd
May 20 2016 15:13
Okay, I see. The AsyncOnSubscribe does in-sequence unsubscription, as you specified. I can't really tell if this can be changed to independent unsubscription or not; since these are user-provided functions, an async cancel may interfere with a concurrently blocking/running generator callback. The operator using, as far as I remember, does anytime unsubscription to allow, for example, closing an InputStream and break the blocking on its read().