These are chat archives for ReactiveX/RxJava

26th
Sep 2017
Rafael Guillen
@rguillens
Sep 26 2017 19:32

When I use Observable.fromArray(array).flatMap(element-> fromCallable(() -> element.value) and element.value is null flatMap swalows the error and continues, but if I change just this: fromCallable(() -> element.value).doOn*() throws the exception.

I'm wondering if this is the RxJava 2 expected behavior, here is the code:

public class FlatMapExample {
static class Pair<K extends Object, V extends Object> {
        private final K key;
        private final V value;

        Pair(K key, V value) {
            this.key = key;
            this.value = value;
        }

        public K getKey() {
            return key;
        }

        public V getValue() {
            return value;
        }
    }

    public static void main(String[] args) {
        Pair<String, String> ONE = new Pair<>("1", "ONE");
        Pair<String, String> TWO = new Pair<>("2", null);
        Pair<String, String> THREE = new Pair<>("3", "THREE");
        Pair<String, String> FOUR = new Pair<>("4", "FOUR");

        Pair[] array = {ONE, TWO, THREE, FOUR};

        withoutOnXOperator(array); 

        sleep(5L);

        withOnXOperator(array); //Throw error

        sleep(5L);
    }
    /**
    /* Throw errors
    */
    private static void withOnXOperator(Pair[] array) {
        System.out.println("With doOn* Operators");
        Observable.fromArray(array)
                .flatMap(
                        pair -> Observable.fromCallable(() -> pair.getValue())
                                .doOnNext(notification -> System.out.println("Event: " + notification)) //Apply doOn* operator
                )
                .cast(String.class)
                .subscribe(new PrintObserver("With doOn*"));
    }

    /**
    /* Doesn't throw errors
    */
    private static void withoutOnXOperator(Pair[] array) {
        System.out.println("Without doOn* Operators*");
        Observable.fromArray(array)
                .flatMap(pair -> Observable.fromCallable(() -> pair.getValue()))
                .cast(String.class)
                .subscribe(new PrintObserver("Without doOn*"));
    }

    static class PrintObserver implements Observer<String> {
        private final String name;

        PrintObserver(String name) {
            this.name = name;
        }

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println(name + " Subscribed!");
        }

        @Override
        public void onNext(String s) {
            System.out.println(name + " onNext: " + s);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println(name + " error: " + e.getMessage());
        }

        @Override
        public void onComplete() {
            System.out.println(name + " Done!");
        }
    }

    private static void sleep(long seconds) {
        try {
            Thread.sleep(seconds * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
Leonardo Lima
@leonardortlima
Sep 26 2017 20:07
Both of them printed ONE and hit onError for me.
Rafael Guillen
@rguillens
Sep 26 2017 20:08

Thanks for the feedback, I'm using Java 8 and RxJava 2.1.0 and this is my output:

Without doOn* Operators*
Without doOn* Subscribed!
Without doOn* onNext: ONE
Without doOn* onNext: THREE
Without doOn* onNext: FOUR
Without doOn* Done!
With doOn* Operators
With doOn* Subscribed!
Event: ONE
With doOn* onNext: ONE
With doOn* error: Callable returned null

With RxJava 2.1.4 works fine!!!

Rafael Guillen
@rguillens
Sep 26 2017 20:23
Yes, it was solved in Pull Request 5517, thanks a lot!
Viktor Sinelnikov
@asinel
Sep 26 2017 21:26

Hi, I have a problem. Chain of calls stops, and I don't understand why.

RxActivityResult.on(activity)
                                .startIntent(Intent(activity, Recaptcha2Activity::class.java))
                                .map { it.data().getStringExtra(Recaptcha2Activity.KEY_CAPTCHA_HASH)?:"" }
                                .subscribe({ }, { })

this code works, I am getting "onNext" method called. But when I am putting it into chain, next "flatMap" isn't called

.flatMap {
                    if(it.requiresCaptcha) {
                        RxActivityResult.on(activity)
                                .startIntent(Intent(activity, Recaptcha2Activity::class.java))
                                .map { it.data().getStringExtra(Recaptcha2Activity.KEY_CAPTCHA_HASH)?:"" }
                    } else {
                        Observable.just("")
                    }
                }
                .flatMap { api.publish(id!!, null) }

What can be a reason? I see in debugger that RxActivityResult... starts, in deep of the library onNext method called, but it is not continued to next .flatMap
Thank you for answers