These are chat archives for ReactiveX/RxJava

May 2016
Ozzy Osborne
May 19 2016 20:04
Hi.. I'm trying to build a custom Observable, using AsyncOnSubscribe.createStateless, and it seems to work.. (yay).. until I try to unsubscribe.. and my unsubscribe handler is never called.. I'm new to RXJava, so may have got this completely upside down.. but if I try something like Subscriber sub = myClass.getObserver().doOnUnsubscribe( () -> System.out.println("Fish") ).subscribe( s -> System.out.println(s) ); .. Fish is printed immediately.. which is odd, because my Observer is still emitting events.. calling sub.unsubscribe() does nothing.. when I'd expect it to invoke the unsubscribe action I passed as the 2nd arg to createStateless ...
David Karnok
May 19 2016 21:11
@BarDweller could you post an unit test that demonstrates this problem?
Ozzy Osborne
May 19 2016 21:12
Hi.. I think it's related to my lack of understanding of rxjava's threading model
I had my asynconsubscribe 'call' method using another thread to obtain data, and the other thread invoked the observer.onNext .. instead now I have that perform the obtain of the data in the same thread that invoked call, and use Observable.subscribeOn(...) to unblock my subscriber
so now .. if I create my Observable like... Observable<T> o = Observable.create(AsyncOnSubscribe.createStateless( myDataProvider) ).doOnUnsubScribe( ()->System.out.println("Fish").subscribeOn(Schedulers.from(myExecutor)) ... it all works.
although most interestingly.. I cannot get an unsubscribe handler to work if I pass it as a 2nd arg to createStateless..
so maybe I'm still doing something wrong there.. but then AsyncOnSubscribe is still @experimental
Ozzy Osborne
May 19 2016 21:17
(although also the only real offered way to implement an observable with backpressure handling etc)