Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Jacob Kristhammar
    @jacobk
    For completness here's the current version that actually works, didn't know about null not usable as values
    Single<RbmAccessToken> accessToken = accessTokens.get(agent);
    
    return Single.just(Optional.ofNullable(accessToken))
      .flatMap(t -> t.orElse(Single.error(new Throwable("no token"))))
      .flatMap(t -> t.isExpired()
          ? Single.error(new Throwable("token expired"))
          : Single.just(t))
      .onErrorResumeNext(t -> {
        Single<RbmAccessToken> newAccessToken = generateAccessToken(agent);
        accessTokens.put(agent, newAccessToken);
        return newAccessToken;
      });
    I feel somwhat ok with using error for the expire branch, but the initial null-check feels really contrieved
    Artem Rudometkin
    @perfectplayer
    From the docs: > Note that if you pass null to Just, it will return an Observable that emits null as an item. Do not make the mistake of assuming that this will return an empty Observable (one that emits no items at all). For that, you will need the Empty operator.
    Jacob Kristhammar
    @jacobk
    Doesn't seem to be true for javarx2 it errors out on null values
    @perfectplayer do you mean "overrx" as in trying to do too much with rx?
    Artem Rudometkin
    @perfectplayer
    Yes, but later code looks bit better. If it really works, I think you don't need to change anything
    Fyodor Sherstobitov
    @fsherstobitov

    Hi! Is there a way to pass the output of one Observable to the input of the second in RxJava2? My task is to get the list of users from server and then save it to the DB. I'm using Retrofit2 for API and Room as ORM on Android. Now I have this code

    api.userList()
                    .map(new Function<List<UserDto>, List<User>>() {
                        @Override
                        public List<User> apply(@NonNull List<UserDto> userDtos) throws Exception {
                            return userMapper.fromDtoList(userDtos);
                        }
                    })

    And I need to pass List<User> that I get from map to this method in DAO

    @Insert
        Completable insertAll(List<User> users);
    Fyodor Sherstobitov
    @fsherstobitov
    Found the answer - I must use flatMap to do that
    Benoit Lubek
    @BoD
    Hi!! I'm trying to find a way to reset a BehaviorSubject... Couldn't find a way - any thoughts?
    Binoy Balu
    @beingentangled
    @fsherstobitov Could you share an example?
    neuberfran
    @neuberfran
    what about rxfirebase?
    Neil Okamoto
    @gonewest818
    Does anyone here know of an example calling rxjava2 from clojure via java interop?
    dave08
    @dave08
    I'm accessing a db with vertx and caching the result in the observable so that all other observables can be zipped with its result. But I need to invalidate this cache. It's an api that tons of android devices are connecting to, the data I'm caching stays the same for all of them, but may be changed in the admin so vertx must reload it...
    I'm currently on RxJava 1.x...
    And Kotlin
    dave08
    @dave08
    It would have been nice to have a .cacheWhile operator for this...
    Ilya Gulya
    @IlyaGulya
    Hello everyone!
    How can i solve such problem.
    I have Observable that emits items randomly. (Button clicks, for example)
    I want it to emit first received item within some window (3 seconds, for example) immediately and then skip all other items that emitted during next 3 seconds. After 3 seconds, this behaviour should repeat.
    Something like this: https://codeshare.io/an4LbE
    Artem Rudometkin
    @perfectplayer
    Seems like you want to filter your items.
    Ilya Krol
    @ilya_krol_twitter

    Hi everyone! I am struggling to find a way to execute the command currently in the 'andThen' only when the upstream completable completes successfully without an error. Is there a better way other than saving the error state in a variable, which is quite ugly?

    someValue
        .toSingle()
        .flatMapCompletable { possiblyExceptionCompletable }
        .andThen(updateInteractor(success))
        .onErrorResumeNext { updateInteractor(failure) }

    In this scenario if possiblyExceptionCompletable throws an exception - both the success and failure updates occur.

    Exerosis
    @Exerosis
    if I were to do: test.switchMap(blah) when test completes does the most recent observable get unsubscribed?
    @ilya_krol_twitter
    value.flatMap(ignored -> opThatMightFail()
                                        .map(result -> new State(result)) //result is in
                                        .startWith(new State()) //Loading
                                        .onErrorReturn(throwable -> new State(throwable)))
               .doOnNext(state -> updateInteractor(state))
    Ilya Krol
    @ilya_krol_twitter

    @Exerosis thanks for the response, but this has the same effect, assuming that the outcome of the flatMap is a Completable and I use andThen instead of doOnNext. I could make it doOnNext (doOnComplete actually as the result is a Completable), and this is how i currently solved this, but the updateInteractor is a Completable also, so this breaks the stream somewhat as I subscribe to this different stream in the side effect operator. It works but it looks ugly and I wonder if there is a better option which keeps me within the same stream for all operations. as a clarification this is how my code looks right now (roughly):

    someValue
        .toSingle()
        .flatMapCompletable { possiblyExceptionCompletable }
        .doOnComplete { updateInteractor(success).subscribeOn(Schedulers.io()).subscribe() }
        .doOnError { updateInteractor(failure).subscribeOn(Schedulers.io()).subscribe() }

    I want to get rid of the two subscriptions mid-stream.

    Exerosis
    @Exerosis
    @ilya_krol_twitter May I ask what your ultimate goal is? And is there a reason you couldn't do someValue.flatMap(possiblyExceptionObservable).map(result -> interactorUpdate).toSingle().subOn(io).sub(this::updateInteractor). ? It's hard to fully understand what's happening here to some degree, might be able to provide a more meaningful answer with a few more details.
    Stas Shusha
    @journeyman
    Hi, could anyone please help me understand why this test fails?! In Rx.NET it works perfectly fine and seems logical :) And I couldnt find the explanation in the docs
        @Test
        fun test() {
            val connectable = Observable.just(1,2,3).replay(1)
            val testSubscriber = TestSubscriber.create<Int>()
    
            connectable.connect()
            connectable.subscribe(testSubscriber)
    
            testSubscriber.assertReceivedOnNext(listOf(3))
        }
    the actual result is all 3 items: [1,2,3]
    David Karnok
    @akarnokd
    Because of backpressure. replay() won't request unless there is a consumer for it.
    Stas Shusha
    @journeyman
    found an answer: related to backpressure changes in rxjava 1.0.14
    @akarnokd oh, thankx)
    David Karnok
    @akarnokd
    You are welcome, twice I guess ;)
    Stas Shusha
    @journeyman
    sure)
    though you haven't propose any workarounds
        fun <T> Observable<T>.replayForced() : ConnectableObservable<T> {
            val upstream = this.replay()
            val sub = upstream.subscribe()
            upstream.doOnUnsubscribe { sub.unsubscribe() }
            return upstream
        }
    holydragon2009
    @holydragon2009
    I have 5 retrofit observables. After finished each of observables, keep running 5 observables to write on local database. Then I need to receive all of results from those observables and subscribe to UI thread later. How to do that? I am a newbie with RxJava. Please help me.
    Yuriy Tim
    @tim4dev

    Hi. I want to wrap a listener to Observable.

    Full code here https://github.com/tim4dev/dirty_code/blob/master/Rx-Listener-to-Observable/app/src/main/java/tim4dev/com/rxlistenertoobservable/MainActivity.java

    On Android 4.2 "Observable Listener" (Observable, Flowable) worked fine.
    On Android 8.0 "Observable Listener" (Observable, Flowable) called 2--3 times and dead.

    Steps to reproduce the problem:

    • set PHONE_NUMBER to your own phone (yes, you will phone call to yourself)
    • run app on Android 8
    • call
    • press disconnect (off hook)
    • if you do it fast enough - everything will be ok
    • if you listen to short beeps and wait, the Listener dies and does not react any more.

    Question: what to do?

    Joshua Street
    @jjstreet
    ahoy mateys
    im struggling to interleave multiple flowables
    into a single flowable
    i ultimately end up with one flowable emitting everything, then another one emitting
    i used flatMap
    but its not really working right
    Joshua Street
    @jjstreet
    bah.. i just did a small sample.. and the sample works just fine
    Joshua Street
    @jjstreet
    if i converted each flowable into a completable view flatMapCompletable and used Completable.amb(), the interleaving works just fine.
    so goofy
    David Karnok
    @akarnokd
    @tim4dev I see two problems:
    1) you didn't initialize mDisposable
    2) the emitter can track only one resource, thus calling setCancellable and setDisposable together will cancel/dispose the previous resource. Use a CompositeDisposable, add resources to it and then call setDisposable() once.
    @jjstreet flatMap doesn't guarantee interleaving and otherwise it depends on the emission frequency of the sources. An attempt at interleaving two range() will not work with flatMap. There is an extension operator that lets you merge streams in a round-robin fashion which can result a better interleaving, still no 100% guarantee.
    Yuriy Tim
    @tim4dev

    @akarnokd

    @tim4dev I see two problems:
    1) you didn't initialize [mDisposable]

    I apologize for that. But this code was commented out.
    I've tried a lot of things really.
    Here is the actual code: https://gist.github.com/tim4dev/e7b207ff85f1798bd10fe6f6b2f0514c

    Yuriy Tim
    @tim4dev
    I was wondering why this does not work only on Android 8.0