RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Person person = new Person();
Observable<Person> personObservable = Observable.just(person).subscribeOn(Schedulers.io());
personObservable.subscribe(p -> {
// add the object to the database
});
// gets the data and return it
Single<RbmAccessToken> accessToken = accessTokens.get(agent);
return Single.just(accessToken)
.flatMap(t -> t == null
? Single.error(new Throwable("no token"))
: t)
.flatMap(t -> Instant.now().plusSeconds(60).isAfter(t.expiresAt())
? Single.error(new Throwable("token expired"))
: Single.just(t))
.onErrorResumeNext(t -> {
Single<RbmAccessToken> newAccessToken = getAccessToken(agent);
accessTokens.put(agent, newAccessToken);
return newAccessToken;
});
I had the following before:
if (accessToken != null && accessToken.blockingGet().expiresAt().isAfter(Instant.now().plusSeconds(60))) {
return accessToken;
}
Single<RbmAccessToken> newAccessToken = getAccessToken(agent);
accessTokens.put(agent, newAccessToken);
return newAccessToken;
and was trying to get rid of the blocking get
Single<RbmAccessToken> accessToken = accessTokens.get(agent);
return Single.just(Optional.ofNullable(accessToken))
.flatMap(t -> t.orElse(Single.error(new Throwable("no token"))))
.flatMap(t -> t.isExpired()
? Single.error(new Throwable("token expired"))
: Single.just(t))
.onErrorResumeNext(t -> {
Single<RbmAccessToken> newAccessToken = generateAccessToken(agent);
accessTokens.put(agent, newAccessToken);
return newAccessToken;
});
Hi! Is there a way to pass the output of one Observable to the input of the second in RxJava2? My task is to get the list of users from server and then save it to the DB. I'm using Retrofit2 for API and Room as ORM on Android. Now I have this code
api.userList()
.map(new Function<List<UserDto>, List<User>>() {
@Override
public List<User> apply(@NonNull List<UserDto> userDtos) throws Exception {
return userMapper.fromDtoList(userDtos);
}
})
And I need to pass List<User>
that I get from map
to this method in DAO
@Insert
Completable insertAll(List<User> users);
Observable
that emits items randomly. (Button clicks, for example)Hi everyone! I am struggling to find a way to execute the command currently in the 'andThen' only when the upstream completable completes successfully without an error. Is there a better way other than saving the error state in a variable, which is quite ugly?
someValue
.toSingle()
.flatMapCompletable { possiblyExceptionCompletable }
.andThen(updateInteractor(success))
.onErrorResumeNext { updateInteractor(failure) }
In this scenario if possiblyExceptionCompletable throws an exception - both the success and failure updates occur.
value.flatMap(ignored -> opThatMightFail()
.map(result -> new State(result)) //result is in
.startWith(new State()) //Loading
.onErrorReturn(throwable -> new State(throwable)))
.doOnNext(state -> updateInteractor(state))
@Exerosis thanks for the response, but this has the same effect, assuming that the outcome of the flatMap is a Completable and I use andThen instead of doOnNext. I could make it doOnNext (doOnComplete actually as the result is a Completable), and this is how i currently solved this, but the updateInteractor is a Completable also, so this breaks the stream somewhat as I subscribe to this different stream in the side effect operator. It works but it looks ugly and I wonder if there is a better option which keeps me within the same stream for all operations. as a clarification this is how my code looks right now (roughly):
someValue
.toSingle()
.flatMapCompletable { possiblyExceptionCompletable }
.doOnComplete { updateInteractor(success).subscribeOn(Schedulers.io()).subscribe() }
.doOnError { updateInteractor(failure).subscribeOn(Schedulers.io()).subscribe() }
I want to get rid of the two subscriptions mid-stream.
@Test
fun test() {
val connectable = Observable.just(1,2,3).replay(1)
val testSubscriber = TestSubscriber.create<Int>()
connectable.connect()
connectable.subscribe(testSubscriber)
testSubscriber.assertReceivedOnNext(listOf(3))
}