RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Person person = new Person();
Observable<Person> personObservable = Observable.just(person).subscribeOn(Schedulers.io());
personObservable.subscribe(p -> {
// add the object to the database
});
// gets the data and return it
Single<RbmAccessToken> accessToken = accessTokens.get(agent);
return Single.just(accessToken)
.flatMap(t -> t == null
? Single.error(new Throwable("no token"))
: t)
.flatMap(t -> Instant.now().plusSeconds(60).isAfter(t.expiresAt())
? Single.error(new Throwable("token expired"))
: Single.just(t))
.onErrorResumeNext(t -> {
Single<RbmAccessToken> newAccessToken = getAccessToken(agent);
accessTokens.put(agent, newAccessToken);
return newAccessToken;
});
I had the following before:
if (accessToken != null && accessToken.blockingGet().expiresAt().isAfter(Instant.now().plusSeconds(60))) {
return accessToken;
}
Single<RbmAccessToken> newAccessToken = getAccessToken(agent);
accessTokens.put(agent, newAccessToken);
return newAccessToken;
and was trying to get rid of the blocking get
Single<RbmAccessToken> accessToken = accessTokens.get(agent);
return Single.just(Optional.ofNullable(accessToken))
.flatMap(t -> t.orElse(Single.error(new Throwable("no token"))))
.flatMap(t -> t.isExpired()
? Single.error(new Throwable("token expired"))
: Single.just(t))
.onErrorResumeNext(t -> {
Single<RbmAccessToken> newAccessToken = generateAccessToken(agent);
accessTokens.put(agent, newAccessToken);
return newAccessToken;
});
Hi! Is there a way to pass the output of one Observable to the input of the second in RxJava2? My task is to get the list of users from server and then save it to the DB. I'm using Retrofit2 for API and Room as ORM on Android. Now I have this code
api.userList()
.map(new Function<List<UserDto>, List<User>>() {
@Override
public List<User> apply(@NonNull List<UserDto> userDtos) throws Exception {
return userMapper.fromDtoList(userDtos);
}
})
And I need to pass List<User>
that I get from map
to this method in DAO
@Insert
Completable insertAll(List<User> users);
Observable
that emits items randomly. (Button clicks, for example)