RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
@Exerosis thanks for the response, but this has the same effect, assuming that the outcome of the flatMap is a Completable and I use andThen instead of doOnNext. I could make it doOnNext (doOnComplete actually as the result is a Completable), and this is how i currently solved this, but the updateInteractor is a Completable also, so this breaks the stream somewhat as I subscribe to this different stream in the side effect operator. It works but it looks ugly and I wonder if there is a better option which keeps me within the same stream for all operations. as a clarification this is how my code looks right now (roughly):
someValue
.toSingle()
.flatMapCompletable { possiblyExceptionCompletable }
.doOnComplete { updateInteractor(success).subscribeOn(Schedulers.io()).subscribe() }
.doOnError { updateInteractor(failure).subscribeOn(Schedulers.io()).subscribe() }
I want to get rid of the two subscriptions mid-stream.
@Test
fun test() {
val connectable = Observable.just(1,2,3).replay(1)
val testSubscriber = TestSubscriber.create<Int>()
connectable.connect()
connectable.subscribe(testSubscriber)
testSubscriber.assertReceivedOnNext(listOf(3))
}
fun <T> Observable<T>.replayForced() : ConnectableObservable<T> {
val upstream = this.replay()
val sub = upstream.subscribe()
upstream.doOnUnsubscribe { sub.unsubscribe() }
return upstream
}
Hi. I want to wrap a listener to Observable.
On Android 4.2 "Observable Listener" (Observable, Flowable) worked fine.
On Android 8.0 "Observable Listener" (Observable, Flowable) called 2--3 times and dead.
Steps to reproduce the problem:
Question: what to do?
setCancellable
and setDisposable
together will cancel/dispose the previous resource. Use a CompositeDisposable, add resources to it and then call setDisposable()
once.
flatMap
doesn't guarantee interleaving and otherwise it depends on the emission frequency of the sources. An attempt at interleaving two range()
will not work with flatMap
. There is an extension operator that lets you merge streams in a round-robin fashion which can result a better interleaving, still no 100% guarantee.
@akarnokd
@tim4dev I see two problems:
1) you didn't initialize [mDisposable]
I apologize for that. But this code was commented out.
I've tried a lot of things really.
Here is the actual code: https://gist.github.com/tim4dev/e7b207ff85f1798bd10fe6f6b2f0514c
When I use Observable.fromArray(array).flatMap(element-> fromCallable(() -> element.value)
and element.value
is null flatMap
swalows the error and continues, but if I change just this: fromCallable(() -> element.value).doOn*()
throws the exception.
I'm wondering if this is the RxJava 2 expected behavior, here is the code:
public class FlatMapExample {
static class Pair<K extends Object, V extends Object> {
private final K key;
private final V value;
Pair(K key, V value) {
this.key = key;
this.value = value;
}
public K getKey() {
return key;
}
public V getValue() {
return value;
}
}
public static void main(String[] args) {
Pair<String, String> ONE = new Pair<>("1", "ONE");
Pair<String, String> TWO = new Pair<>("2", null);
Pair<String, String> THREE = new Pair<>("3", "THREE");
Pair<String, String> FOUR = new Pair<>("4", "FOUR");
Pair[] array = {ONE, TWO, THREE, FOUR};
withoutOnXOperator(array);
sleep(5L);
withOnXOperator(array); //Throw error
sleep(5L);
}
/**
/* Throw errors
*/
private static void withOnXOperator(Pair[] array) {
System.out.println("With doOn* Operators");
Observable.fromArray(array)
.flatMap(
pair -> Observable.fromCallable(() -> pair.getValue())
.doOnNext(notification -> System.out.println("Event: " + notification)) //Apply doOn* operator
)
.cast(String.class)
.subscribe(new PrintObserver("With doOn*"));
}
/**
/* Doesn't throw errors
*/
private static void withoutOnXOperator(Pair[] array) {
System.out.println("Without doOn* Operators*");
Observable.fromArray(array)
.flatMap(pair -> Observable.fromCallable(() -> pair.getValue()))
.cast(String.class)
.subscribe(new PrintObserver("Without doOn*"));
}
static class PrintObserver implements Observer<String> {
private final String name;
PrintObserver(String name) {
this.name = name;
}
@Override
public void onSubscribe(Disposable d) {
System.out.println(name + " Subscribed!");
}
@Override
public void onNext(String s) {
System.out.println(name + " onNext: " + s);
}
@Override
public void onError(Throwable e) {
System.out.println(name + " error: " + e.getMessage());
}
@Override
public void onComplete() {
System.out.println(name + " Done!");
}
}
private static void sleep(long seconds) {
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Thanks for the feedback, I'm using Java 8 and RxJava 2.1.0 and this is my output:
Without doOn* Operators*
Without doOn* Subscribed!
Without doOn* onNext: ONE
Without doOn* onNext: THREE
Without doOn* onNext: FOUR
Without doOn* Done!
With doOn* Operators
With doOn* Subscribed!
Event: ONE
With doOn* onNext: ONE
With doOn* error: Callable returned null
With RxJava 2.1.4 works fine!!!
Hi, I have a problem. Chain of calls stops, and I don't understand why.
RxActivityResult.on(activity)
.startIntent(Intent(activity, Recaptcha2Activity::class.java))
.map { it.data().getStringExtra(Recaptcha2Activity.KEY_CAPTCHA_HASH)?:"" }
.subscribe({ }, { })
this code works, I am getting "onNext" method called. But when I am putting it into chain, next "flatMap" isn't called
.flatMap {
if(it.requiresCaptcha) {
RxActivityResult.on(activity)
.startIntent(Intent(activity, Recaptcha2Activity::class.java))
.map { it.data().getStringExtra(Recaptcha2Activity.KEY_CAPTCHA_HASH)?:"" }
} else {
Observable.just("")
}
}
.flatMap { api.publish(id!!, null) }
What can be a reason? I see in debugger that RxActivityResult... starts, in deep of the library onNext method called, but it is not continued to next .flatMap
Thank you for answers
.flatMap(object: Function<RequiresCaptchaResponse, Observable<String>> {
override fun apply(it: RequiresCaptchaResponse): Observable<String> {
val subject = PublishSubject.create<String>()
if (it.requiresCaptcha) {
RxActivityResult.on(activity)
.startIntent(Intent(activity, Recaptcha2Activity::class.java))
.map { it.data().getStringExtra(Recaptcha2Activity.KEY_CAPTCHA_HASH)?:"" }
.subscribe({ subject.onNext(it) })
return subject
} else {
return Observable.just("")
}
}
})
.flatMap { api.publishSendIt(id!!, it) }