Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Yuriy Tim
    @tim4dev
    I was wondering why this does not work only on Android 8.0
    Rafael Guillen
    @rguillens

    When I use Observable.fromArray(array).flatMap(element-> fromCallable(() -> element.value) and element.value is null flatMap swalows the error and continues, but if I change just this: fromCallable(() -> element.value).doOn*() throws the exception.

    I'm wondering if this is the RxJava 2 expected behavior, here is the code:

    public class FlatMapExample {
    static class Pair<K extends Object, V extends Object> {
            private final K key;
            private final V value;
    
            Pair(K key, V value) {
                this.key = key;
                this.value = value;
            }
    
            public K getKey() {
                return key;
            }
    
            public V getValue() {
                return value;
            }
        }
    
        public static void main(String[] args) {
            Pair<String, String> ONE = new Pair<>("1", "ONE");
            Pair<String, String> TWO = new Pair<>("2", null);
            Pair<String, String> THREE = new Pair<>("3", "THREE");
            Pair<String, String> FOUR = new Pair<>("4", "FOUR");
    
            Pair[] array = {ONE, TWO, THREE, FOUR};
    
            withoutOnXOperator(array); 
    
            sleep(5L);
    
            withOnXOperator(array); //Throw error
    
            sleep(5L);
        }
        /**
        /* Throw errors
        */
        private static void withOnXOperator(Pair[] array) {
            System.out.println("With doOn* Operators");
            Observable.fromArray(array)
                    .flatMap(
                            pair -> Observable.fromCallable(() -> pair.getValue())
                                    .doOnNext(notification -> System.out.println("Event: " + notification)) //Apply doOn* operator
                    )
                    .cast(String.class)
                    .subscribe(new PrintObserver("With doOn*"));
        }
    
        /**
        /* Doesn't throw errors
        */
        private static void withoutOnXOperator(Pair[] array) {
            System.out.println("Without doOn* Operators*");
            Observable.fromArray(array)
                    .flatMap(pair -> Observable.fromCallable(() -> pair.getValue()))
                    .cast(String.class)
                    .subscribe(new PrintObserver("Without doOn*"));
        }
    
        static class PrintObserver implements Observer<String> {
            private final String name;
    
            PrintObserver(String name) {
                this.name = name;
            }
    
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println(name + " Subscribed!");
            }
    
            @Override
            public void onNext(String s) {
                System.out.println(name + " onNext: " + s);
            }
    
            @Override
            public void onError(Throwable e) {
                System.out.println(name + " error: " + e.getMessage());
            }
    
            @Override
            public void onComplete() {
                System.out.println(name + " Done!");
            }
        }
    
        private static void sleep(long seconds) {
            try {
                Thread.sleep(seconds * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    Leonardo Lima
    @leonardortlima
    Both of them printed ONE and hit onError for me.
    Rafael Guillen
    @rguillens

    Thanks for the feedback, I'm using Java 8 and RxJava 2.1.0 and this is my output:

    Without doOn* Operators*
    Without doOn* Subscribed!
    Without doOn* onNext: ONE
    Without doOn* onNext: THREE
    Without doOn* onNext: FOUR
    Without doOn* Done!
    With doOn* Operators
    With doOn* Subscribed!
    Event: ONE
    With doOn* onNext: ONE
    With doOn* error: Callable returned null

    With RxJava 2.1.4 works fine!!!

    Rafael Guillen
    @rguillens
    Yes, it was solved in Pull Request 5517, thanks a lot!
    Viktor Sinelnikov
    @asinel

    Hi, I have a problem. Chain of calls stops, and I don't understand why.

    RxActivityResult.on(activity)
                                    .startIntent(Intent(activity, Recaptcha2Activity::class.java))
                                    .map { it.data().getStringExtra(Recaptcha2Activity.KEY_CAPTCHA_HASH)?:"" }
                                    .subscribe({ }, { })

    this code works, I am getting "onNext" method called. But when I am putting it into chain, next "flatMap" isn't called

    .flatMap {
                        if(it.requiresCaptcha) {
                            RxActivityResult.on(activity)
                                    .startIntent(Intent(activity, Recaptcha2Activity::class.java))
                                    .map { it.data().getStringExtra(Recaptcha2Activity.KEY_CAPTCHA_HASH)?:"" }
                        } else {
                            Observable.just("")
                        }
                    }
                    .flatMap { api.publish(id!!, null) }

    What can be a reason? I see in debugger that RxActivityResult... starts, in deep of the library onNext method called, but it is not continued to next .flatMap
    Thank you for answers

    David Karnok
    @akarnokd
    You could add a doOnEvent({ println(it) }) before the last flatMap to see what events reach that point. Then start moving it up the chain and into the inner lambda.
    Viktor Sinelnikov
    @asinel
    I did, and there is no events
    but if it.requiresCaptcha is false, everything is ok
    David Karnok
    @akarnokd
    That means something gets lost inside RxActivityResult. I can't help you with that as it is a 3rd party component.
    Viktor Sinelnikov
    @asinel
    I exlored code of this library, used debugger, and I see that it calls subject.onNext(), the same as in first example.
    David Karnok
    @akarnokd
    What type of subject? If its a PublishSubject, then it is possible a signal happens before the top flatMap even subscribes to it.
    Viktor Sinelnikov
    @asinel
    Yes, it's PublishSubject
    David Karnok
    @akarnokd
    You can debug RxActivityResult and see if there is a subscriber to PublishSubject at that point or not.
    Viktor Sinelnikov
    @asinel
    @akarnokd I see 1 subscriber when subject calls onNext
    Also I tried to do a hack:
    .flatMap(object: Function<RequiresCaptchaResponse, Observable<String>> {
                        override fun apply(it: RequiresCaptchaResponse): Observable<String> {
                            val subject = PublishSubject.create<String>()
                            if (it.requiresCaptcha) {
                                RxActivityResult.on(activity)
                                        .startIntent(Intent(activity, Recaptcha2Activity::class.java))
                                        .map { it.data().getStringExtra(Recaptcha2Activity.KEY_CAPTCHA_HASH)?:"" }
                                        .subscribe({ subject.onNext(it) })
                                return subject
                            } else {
                                return Observable.just("")
                            }
                        }
                    })
    .flatMap { api.publishSendIt(id!!, it) }
    I am getting a callback in "subsribe", so it 100% calls subject.onNext
    But I don't get a call into next flatMap
    David Karnok
    @akarnokd
    Try ReplaySubject in your hack.
    Viktor Sinelnikov
    @asinel
    Tried, no success(
    David Karnok
    @akarnokd
    In that case, please provide a runnable application that demonstrates this problem.
    Amr Elmasry
    @AmrElmasry

    Hello, I have a question about the operators implementation,

    @Override
            public void onSubscribe(Disposable s) {
                if (DisposableHelper.validate(this.s, s)) {
                    this.s = s;
                    actual.onSubscribe(this);
                }
            }

    I understand that this line DisposableHelper.validate(this.s, s) is to assert that the value from upstream isn't null and it wasn't set before, but why in this line actual.onSubscribe(this); we passed this ? why not just passing s ? what is the need for implementing Disposable ?

    Exerosis
    @Exerosis
    Is there a version of RxJava 2 that uses java functional interfaces?
    David Karnok
    @akarnokd
    @AmrElmasry Many operators have to intercept dispose, but otherwise it is required by RxJava so operator fusion doesn't skip your operator by talking to the upstream's Disposable directly.
    @Exerosis No. RxJava has to run on Java 6 where those interfaces are not available. If there is going to be a RxJava 3 with Java 9, we may still use custom interfaces as the Java 8 functional interfaces don't allow throwing checked exceptions - it has become a convenience with RxJava 2 to be able to throw at will from lambdas.
    Amr Elmasry
    @AmrElmasry
    @akarnokd Thanks
    Haris Kljajić
    @devharis
    Is it possible to get current index/position when using .flatMapIterable?
    Or is there any other option to get a index on each iteration of a list
    On each iteration I want to call a method to notify current progress which in this case is the index.
    David Stocking
    @dmstocking
    I don't know if its idiomatic, but have you tried just zipping your data together with an Observable that just counts from 0 to size-1.
    Amir Mahmoud
    @amebrahimi
    Hello Guys, does any one know a good course for learning RxJava and RxAndroid?
    Dmitriy Manzhosov
    @manzhda
    +1
    Gundelino85
    @Gundelino85
    hi guys! :)
    I tried many hours now :worried: , maybe someone can help me. I have to get x files from a server, but only want to download 4 at the same time. These downloads are observables. And when i downloaded all of them (no difference if they fail or not) i get the message, that the download is complete. i found zip and flatMapWithMaxConcurrent but i dont know how to use them with each other. Maybe other methods are even better. I hope someone can help me :)
    Dorus
    @Dorus
    @Gundelino85 flatMap has overloads with the maxConcurrency parameter. Just set that to 4 and use the flatMap as normal.
    Gundelino85
    @Gundelino85
    you again :D
    ok, let me explain what i have. I use retrofit and get a list with UUIDs in the first request. In that subscription i want to do what i wrote above. But i can not get this "complete state" after everything is finished
    *in the first request i get the list of IDs for the PDFs i want to download...
    Dorus
    @Dorus
    @Gundelino85 yeah, but my RxJava knowledge is a bit more limited. However the answer i gave in the other chat should still apply. You could do something like:
    initialRequest()
      .flatMap(arr -> Observable.from(arr)
        .flatMap(id -> otherRequest(id), bool, 4)
      )
    Gundelino85
    @Gundelino85
    but then i subscribe on each request and not on the package, or am i wrong?
    Ethan Le
    @quanlt

    Hello guys! i have a problem with rxjava.
    I have a list of image url and i need to convert it to jpeg, so i tried

    Observable.fromIterable(entries)
           .concatMap(entry->Observable.just(ImageUtils.convertFile(entry.getPath())))
           .subscribeOn(Schedulers.io())

    in ImageUtils.convertFile, i decode bitmap then convert it to jpeg.
    It worked well on my Galaxy S7 Edge :tada:, but it throw out of memory exception in BitmapFactory.decodeFile (ImageUtils) :( on low-end devices.
    At first, i though it's because BitmapFactory.decodeFile use too much memory, but then i realize it can be error of Schedulers.io().
    Do you have any advice to solve this problem? Thank you so much:D

    Tabiul
    @tabiul
    @quanlt why do you think it have to do with Schedulers.io ?
    I am new to Rx and RxJava, I am trying a very simple example Observable<String> ob = Observable.<String>create(t -> t.onNext("1")). concatWith(Observable.create(t -> t.onNext("2"))); ob.subscribe(System.out::println);. For some reason it only prints 1. Any idea why ?
    Ethan Le
    @quanlt
    @tabiul i think it's because Schedulers.io have a limited memory, and decode bitmap consumes many memory
    i'm quite new with rxjava too :(
    ViTORossonero
    @ViTORossonero

    @quanlt Taking into account that you are working with bitmaps, I'd say that Schedulers#io has nothing to do with OOM in this case

    code that you provided shouldn't process few bitmaps in parallel so you'd rather check your ImageUtils#convertFile