RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Hi! Is there a way to pass the output of one Observable to the input of the second in RxJava2? My task is to get the list of users from server and then save it to the DB. I'm using Retrofit2 for API and Room as ORM on Android. Now I have this code
api.userList()
.map(new Function<List<UserDto>, List<User>>() {
@Override
public List<User> apply(@NonNull List<UserDto> userDtos) throws Exception {
return userMapper.fromDtoList(userDtos);
}
})
And I need to pass List<User>
that I get from map
to this method in DAO
@Insert
Completable insertAll(List<User> users);
Observable
that emits items randomly. (Button clicks, for example)Hi everyone! I am struggling to find a way to execute the command currently in the 'andThen' only when the upstream completable completes successfully without an error. Is there a better way other than saving the error state in a variable, which is quite ugly?
someValue
.toSingle()
.flatMapCompletable { possiblyExceptionCompletable }
.andThen(updateInteractor(success))
.onErrorResumeNext { updateInteractor(failure) }
In this scenario if possiblyExceptionCompletable throws an exception - both the success and failure updates occur.
value.flatMap(ignored -> opThatMightFail()
.map(result -> new State(result)) //result is in
.startWith(new State()) //Loading
.onErrorReturn(throwable -> new State(throwable)))
.doOnNext(state -> updateInteractor(state))
@Exerosis thanks for the response, but this has the same effect, assuming that the outcome of the flatMap is a Completable and I use andThen instead of doOnNext. I could make it doOnNext (doOnComplete actually as the result is a Completable), and this is how i currently solved this, but the updateInteractor is a Completable also, so this breaks the stream somewhat as I subscribe to this different stream in the side effect operator. It works but it looks ugly and I wonder if there is a better option which keeps me within the same stream for all operations. as a clarification this is how my code looks right now (roughly):
someValue
.toSingle()
.flatMapCompletable { possiblyExceptionCompletable }
.doOnComplete { updateInteractor(success).subscribeOn(Schedulers.io()).subscribe() }
.doOnError { updateInteractor(failure).subscribeOn(Schedulers.io()).subscribe() }
I want to get rid of the two subscriptions mid-stream.
@Test
fun test() {
val connectable = Observable.just(1,2,3).replay(1)
val testSubscriber = TestSubscriber.create<Int>()
connectable.connect()
connectable.subscribe(testSubscriber)
testSubscriber.assertReceivedOnNext(listOf(3))
}
fun <T> Observable<T>.replayForced() : ConnectableObservable<T> {
val upstream = this.replay()
val sub = upstream.subscribe()
upstream.doOnUnsubscribe { sub.unsubscribe() }
return upstream
}
Hi. I want to wrap a listener to Observable.
On Android 4.2 "Observable Listener" (Observable, Flowable) worked fine.
On Android 8.0 "Observable Listener" (Observable, Flowable) called 2--3 times and dead.
Steps to reproduce the problem:
Question: what to do?
setCancellable
and setDisposable
together will cancel/dispose the previous resource. Use a CompositeDisposable, add resources to it and then call setDisposable()
once.
flatMap
doesn't guarantee interleaving and otherwise it depends on the emission frequency of the sources. An attempt at interleaving two range()
will not work with flatMap
. There is an extension operator that lets you merge streams in a round-robin fashion which can result a better interleaving, still no 100% guarantee.
@akarnokd
@tim4dev I see two problems:
1) you didn't initialize [mDisposable]
I apologize for that. But this code was commented out.
I've tried a lot of things really.
Here is the actual code: https://gist.github.com/tim4dev/e7b207ff85f1798bd10fe6f6b2f0514c
When I use Observable.fromArray(array).flatMap(element-> fromCallable(() -> element.value)
and element.value
is null flatMap
swalows the error and continues, but if I change just this: fromCallable(() -> element.value).doOn*()
throws the exception.
I'm wondering if this is the RxJava 2 expected behavior, here is the code:
public class FlatMapExample {
static class Pair<K extends Object, V extends Object> {
private final K key;
private final V value;
Pair(K key, V value) {
this.key = key;
this.value = value;
}
public K getKey() {
return key;
}
public V getValue() {
return value;
}
}
public static void main(String[] args) {
Pair<String, String> ONE = new Pair<>("1", "ONE");
Pair<String, String> TWO = new Pair<>("2", null);
Pair<String, String> THREE = new Pair<>("3", "THREE");
Pair<String, String> FOUR = new Pair<>("4", "FOUR");
Pair[] array = {ONE, TWO, THREE, FOUR};
withoutOnXOperator(array);
sleep(5L);
withOnXOperator(array); //Throw error
sleep(5L);
}
/**
/* Throw errors
*/
private static void withOnXOperator(Pair[] array) {
System.out.println("With doOn* Operators");
Observable.fromArray(array)
.flatMap(
pair -> Observable.fromCallable(() -> pair.getValue())
.doOnNext(notification -> System.out.println("Event: " + notification)) //Apply doOn* operator
)
.cast(String.class)
.subscribe(new PrintObserver("With doOn*"));
}
/**
/* Doesn't throw errors
*/
private static void withoutOnXOperator(Pair[] array) {
System.out.println("Without doOn* Operators*");
Observable.fromArray(array)
.flatMap(pair -> Observable.fromCallable(() -> pair.getValue()))
.cast(String.class)
.subscribe(new PrintObserver("Without doOn*"));
}
static class PrintObserver implements Observer<String> {
private final String name;
PrintObserver(String name) {
this.name = name;
}
@Override
public void onSubscribe(Disposable d) {
System.out.println(name + " Subscribed!");
}
@Override
public void onNext(String s) {
System.out.println(name + " onNext: " + s);
}
@Override
public void onError(Throwable e) {
System.out.println(name + " error: " + e.getMessage());
}
@Override
public void onComplete() {
System.out.println(name + " Done!");
}
}
private static void sleep(long seconds) {
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}