These are chat archives for ReactiveX/RxJava

20th
Aug 2015
arman yessenamanov
@yesenarman
Aug 20 2015 02:33

Hello, everyone.
Can someone please explain why do the following two snippets of code behave differently?

Observable.<Boolean>create(subscriber -> Observable.just(true).subscribe(subscriber))
    .flatMap(
        b -> Observable.<Integer>create(subscriber -> {
               subscriber.onNext(1);
               subscriber.onCompleted();
        }).subscribeOn(Schedulers.io()) // to imitate async request
    )
    .subscribe(System.out::println);
Observable.<Boolean>create(subscriber -> Observable.just(true).subscribe(subscriber::onNext, subscriber::onError, subscriber::onCompleted))
    .flatMap(
        b -> Observable.<Integer>create(subscriber -> {
               subscriber.onNext(1);
               subscriber.onCompleted();
        }).subscribeOn(Schedulers.io()) // to imitate async request
    )
    .subscribe(System.out::println);

When I run the first one it doesn’t print anything to the console, but the second one prints 1 as expected.

Aaron Tull
@stealthcode
Aug 20 2015 17:52
@yesenarman your first example has a race condition. When your observable chain subscribes to the subscribeOn operator a new thread on the io scheduler subscribes upward to the flatmap but your main thread returns from the subscribe and terminates the program. hence why you don't see it print. The other thread hasn't onNexted yet.
you can fix the race condition by using a TestSubscriber and calling awaitTerminalEventAndUnsubscribeOnTimeout(int, TimeUnit) or simply adding a countdown latch and awaiting after your subscribe
Aaron Tull
@stealthcode
Aug 20 2015 19:22
@yesenarman oh and in production code you can call .toBlocking () to get access to operators that await a terminal event.