Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Stas Shusha
    @journeyman
    found an answer: related to backpressure changes in rxjava 1.0.14
    @akarnokd oh, thankx)
    David Karnok
    @akarnokd
    You are welcome, twice I guess ;)
    Stas Shusha
    @journeyman
    sure)
    though you haven't propose any workarounds
        fun <T> Observable<T>.replayForced() : ConnectableObservable<T> {
            val upstream = this.replay()
            val sub = upstream.subscribe()
            upstream.doOnUnsubscribe { sub.unsubscribe() }
            return upstream
        }
    holydragon2009
    @holydragon2009
    I have 5 retrofit observables. After finished each of observables, keep running 5 observables to write on local database. Then I need to receive all of results from those observables and subscribe to UI thread later. How to do that? I am a newbie with RxJava. Please help me.
    Yuriy Tim
    @tim4dev

    Hi. I want to wrap a listener to Observable.

    Full code here https://github.com/tim4dev/dirty_code/blob/master/Rx-Listener-to-Observable/app/src/main/java/tim4dev/com/rxlistenertoobservable/MainActivity.java

    On Android 4.2 "Observable Listener" (Observable, Flowable) worked fine.
    On Android 8.0 "Observable Listener" (Observable, Flowable) called 2--3 times and dead.

    Steps to reproduce the problem:

    • set PHONE_NUMBER to your own phone (yes, you will phone call to yourself)
    • run app on Android 8
    • call
    • press disconnect (off hook)
    • if you do it fast enough - everything will be ok
    • if you listen to short beeps and wait, the Listener dies and does not react any more.

    Question: what to do?

    Joshua Street
    @jjstreet
    ahoy mateys
    im struggling to interleave multiple flowables
    into a single flowable
    i ultimately end up with one flowable emitting everything, then another one emitting
    i used flatMap
    but its not really working right
    Joshua Street
    @jjstreet
    bah.. i just did a small sample.. and the sample works just fine
    Joshua Street
    @jjstreet
    if i converted each flowable into a completable view flatMapCompletable and used Completable.amb(), the interleaving works just fine.
    so goofy
    David Karnok
    @akarnokd
    @tim4dev I see two problems:
    1) you didn't initialize mDisposable
    2) the emitter can track only one resource, thus calling setCancellable and setDisposable together will cancel/dispose the previous resource. Use a CompositeDisposable, add resources to it and then call setDisposable() once.
    @jjstreet flatMap doesn't guarantee interleaving and otherwise it depends on the emission frequency of the sources. An attempt at interleaving two range() will not work with flatMap. There is an extension operator that lets you merge streams in a round-robin fashion which can result a better interleaving, still no 100% guarantee.
    Yuriy Tim
    @tim4dev

    @akarnokd

    @tim4dev I see two problems:
    1) you didn't initialize [mDisposable]

    I apologize for that. But this code was commented out.
    I've tried a lot of things really.
    Here is the actual code: https://gist.github.com/tim4dev/e7b207ff85f1798bd10fe6f6b2f0514c

    Yuriy Tim
    @tim4dev
    I was wondering why this does not work only on Android 8.0
    Rafael Guillen
    @rguillens

    When I use Observable.fromArray(array).flatMap(element-> fromCallable(() -> element.value) and element.value is null flatMap swalows the error and continues, but if I change just this: fromCallable(() -> element.value).doOn*() throws the exception.

    I'm wondering if this is the RxJava 2 expected behavior, here is the code:

    public class FlatMapExample {
    static class Pair<K extends Object, V extends Object> {
            private final K key;
            private final V value;
    
            Pair(K key, V value) {
                this.key = key;
                this.value = value;
            }
    
            public K getKey() {
                return key;
            }
    
            public V getValue() {
                return value;
            }
        }
    
        public static void main(String[] args) {
            Pair<String, String> ONE = new Pair<>("1", "ONE");
            Pair<String, String> TWO = new Pair<>("2", null);
            Pair<String, String> THREE = new Pair<>("3", "THREE");
            Pair<String, String> FOUR = new Pair<>("4", "FOUR");
    
            Pair[] array = {ONE, TWO, THREE, FOUR};
    
            withoutOnXOperator(array); 
    
            sleep(5L);
    
            withOnXOperator(array); //Throw error
    
            sleep(5L);
        }
        /**
        /* Throw errors
        */
        private static void withOnXOperator(Pair[] array) {
            System.out.println("With doOn* Operators");
            Observable.fromArray(array)
                    .flatMap(
                            pair -> Observable.fromCallable(() -> pair.getValue())
                                    .doOnNext(notification -> System.out.println("Event: " + notification)) //Apply doOn* operator
                    )
                    .cast(String.class)
                    .subscribe(new PrintObserver("With doOn*"));
        }
    
        /**
        /* Doesn't throw errors
        */
        private static void withoutOnXOperator(Pair[] array) {
            System.out.println("Without doOn* Operators*");
            Observable.fromArray(array)
                    .flatMap(pair -> Observable.fromCallable(() -> pair.getValue()))
                    .cast(String.class)
                    .subscribe(new PrintObserver("Without doOn*"));
        }
    
        static class PrintObserver implements Observer<String> {
            private final String name;
    
            PrintObserver(String name) {
                this.name = name;
            }
    
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println(name + " Subscribed!");
            }
    
            @Override
            public void onNext(String s) {
                System.out.println(name + " onNext: " + s);
            }
    
            @Override
            public void onError(Throwable e) {
                System.out.println(name + " error: " + e.getMessage());
            }
    
            @Override
            public void onComplete() {
                System.out.println(name + " Done!");
            }
        }
    
        private static void sleep(long seconds) {
            try {
                Thread.sleep(seconds * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    Leonardo Lima
    @leonardortlima
    Both of them printed ONE and hit onError for me.
    Rafael Guillen
    @rguillens

    Thanks for the feedback, I'm using Java 8 and RxJava 2.1.0 and this is my output:

    Without doOn* Operators*
    Without doOn* Subscribed!
    Without doOn* onNext: ONE
    Without doOn* onNext: THREE
    Without doOn* onNext: FOUR
    Without doOn* Done!
    With doOn* Operators
    With doOn* Subscribed!
    Event: ONE
    With doOn* onNext: ONE
    With doOn* error: Callable returned null

    With RxJava 2.1.4 works fine!!!

    Rafael Guillen
    @rguillens
    Yes, it was solved in Pull Request 5517, thanks a lot!
    Viktor Sinelnikov
    @asinel

    Hi, I have a problem. Chain of calls stops, and I don't understand why.

    RxActivityResult.on(activity)
                                    .startIntent(Intent(activity, Recaptcha2Activity::class.java))
                                    .map { it.data().getStringExtra(Recaptcha2Activity.KEY_CAPTCHA_HASH)?:"" }
                                    .subscribe({ }, { })

    this code works, I am getting "onNext" method called. But when I am putting it into chain, next "flatMap" isn't called

    .flatMap {
                        if(it.requiresCaptcha) {
                            RxActivityResult.on(activity)
                                    .startIntent(Intent(activity, Recaptcha2Activity::class.java))
                                    .map { it.data().getStringExtra(Recaptcha2Activity.KEY_CAPTCHA_HASH)?:"" }
                        } else {
                            Observable.just("")
                        }
                    }
                    .flatMap { api.publish(id!!, null) }

    What can be a reason? I see in debugger that RxActivityResult... starts, in deep of the library onNext method called, but it is not continued to next .flatMap
    Thank you for answers

    David Karnok
    @akarnokd
    You could add a doOnEvent({ println(it) }) before the last flatMap to see what events reach that point. Then start moving it up the chain and into the inner lambda.
    Viktor Sinelnikov
    @asinel
    I did, and there is no events
    but if it.requiresCaptcha is false, everything is ok
    David Karnok
    @akarnokd
    That means something gets lost inside RxActivityResult. I can't help you with that as it is a 3rd party component.
    Viktor Sinelnikov
    @asinel
    I exlored code of this library, used debugger, and I see that it calls subject.onNext(), the same as in first example.
    David Karnok
    @akarnokd
    What type of subject? If its a PublishSubject, then it is possible a signal happens before the top flatMap even subscribes to it.
    Viktor Sinelnikov
    @asinel
    Yes, it's PublishSubject
    David Karnok
    @akarnokd
    You can debug RxActivityResult and see if there is a subscriber to PublishSubject at that point or not.
    Viktor Sinelnikov
    @asinel
    @akarnokd I see 1 subscriber when subject calls onNext
    Also I tried to do a hack:
    .flatMap(object: Function<RequiresCaptchaResponse, Observable<String>> {
                        override fun apply(it: RequiresCaptchaResponse): Observable<String> {
                            val subject = PublishSubject.create<String>()
                            if (it.requiresCaptcha) {
                                RxActivityResult.on(activity)
                                        .startIntent(Intent(activity, Recaptcha2Activity::class.java))
                                        .map { it.data().getStringExtra(Recaptcha2Activity.KEY_CAPTCHA_HASH)?:"" }
                                        .subscribe({ subject.onNext(it) })
                                return subject
                            } else {
                                return Observable.just("")
                            }
                        }
                    })
    .flatMap { api.publishSendIt(id!!, it) }
    I am getting a callback in "subsribe", so it 100% calls subject.onNext
    But I don't get a call into next flatMap
    David Karnok
    @akarnokd
    Try ReplaySubject in your hack.
    Viktor Sinelnikov
    @asinel
    Tried, no success(
    David Karnok
    @akarnokd
    In that case, please provide a runnable application that demonstrates this problem.
    Amr Elmasry
    @AmrElmasry

    Hello, I have a question about the operators implementation,

    @Override
            public void onSubscribe(Disposable s) {
                if (DisposableHelper.validate(this.s, s)) {
                    this.s = s;
                    actual.onSubscribe(this);
                }
            }

    I understand that this line DisposableHelper.validate(this.s, s) is to assert that the value from upstream isn't null and it wasn't set before, but why in this line actual.onSubscribe(this); we passed this ? why not just passing s ? what is the need for implementing Disposable ?

    Exerosis
    @Exerosis
    Is there a version of RxJava 2 that uses java functional interfaces?
    David Karnok
    @akarnokd
    @AmrElmasry Many operators have to intercept dispose, but otherwise it is required by RxJava so operator fusion doesn't skip your operator by talking to the upstream's Disposable directly.
    @Exerosis No. RxJava has to run on Java 6 where those interfaces are not available. If there is going to be a RxJava 3 with Java 9, we may still use custom interfaces as the Java 8 functional interfaces don't allow throwing checked exceptions - it has become a convenience with RxJava 2 to be able to throw at will from lambdas.