RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
When I use Observable.fromArray(array).flatMap(element-> fromCallable(() -> element.value)
and element.value
is null flatMap
swalows the error and continues, but if I change just this: fromCallable(() -> element.value).doOn*()
throws the exception.
I'm wondering if this is the RxJava 2 expected behavior, here is the code:
public class FlatMapExample {
static class Pair<K extends Object, V extends Object> {
private final K key;
private final V value;
Pair(K key, V value) {
this.key = key;
this.value = value;
}
public K getKey() {
return key;
}
public V getValue() {
return value;
}
}
public static void main(String[] args) {
Pair<String, String> ONE = new Pair<>("1", "ONE");
Pair<String, String> TWO = new Pair<>("2", null);
Pair<String, String> THREE = new Pair<>("3", "THREE");
Pair<String, String> FOUR = new Pair<>("4", "FOUR");
Pair[] array = {ONE, TWO, THREE, FOUR};
withoutOnXOperator(array);
sleep(5L);
withOnXOperator(array); //Throw error
sleep(5L);
}
/**
/* Throw errors
*/
private static void withOnXOperator(Pair[] array) {
System.out.println("With doOn* Operators");
Observable.fromArray(array)
.flatMap(
pair -> Observable.fromCallable(() -> pair.getValue())
.doOnNext(notification -> System.out.println("Event: " + notification)) //Apply doOn* operator
)
.cast(String.class)
.subscribe(new PrintObserver("With doOn*"));
}
/**
/* Doesn't throw errors
*/
private static void withoutOnXOperator(Pair[] array) {
System.out.println("Without doOn* Operators*");
Observable.fromArray(array)
.flatMap(pair -> Observable.fromCallable(() -> pair.getValue()))
.cast(String.class)
.subscribe(new PrintObserver("Without doOn*"));
}
static class PrintObserver implements Observer<String> {
private final String name;
PrintObserver(String name) {
this.name = name;
}
@Override
public void onSubscribe(Disposable d) {
System.out.println(name + " Subscribed!");
}
@Override
public void onNext(String s) {
System.out.println(name + " onNext: " + s);
}
@Override
public void onError(Throwable e) {
System.out.println(name + " error: " + e.getMessage());
}
@Override
public void onComplete() {
System.out.println(name + " Done!");
}
}
private static void sleep(long seconds) {
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Thanks for the feedback, I'm using Java 8 and RxJava 2.1.0 and this is my output:
Without doOn* Operators*
Without doOn* Subscribed!
Without doOn* onNext: ONE
Without doOn* onNext: THREE
Without doOn* onNext: FOUR
Without doOn* Done!
With doOn* Operators
With doOn* Subscribed!
Event: ONE
With doOn* onNext: ONE
With doOn* error: Callable returned null
With RxJava 2.1.4 works fine!!!
Hi, I have a problem. Chain of calls stops, and I don't understand why.
RxActivityResult.on(activity)
.startIntent(Intent(activity, Recaptcha2Activity::class.java))
.map { it.data().getStringExtra(Recaptcha2Activity.KEY_CAPTCHA_HASH)?:"" }
.subscribe({ }, { })
this code works, I am getting "onNext" method called. But when I am putting it into chain, next "flatMap" isn't called
.flatMap {
if(it.requiresCaptcha) {
RxActivityResult.on(activity)
.startIntent(Intent(activity, Recaptcha2Activity::class.java))
.map { it.data().getStringExtra(Recaptcha2Activity.KEY_CAPTCHA_HASH)?:"" }
} else {
Observable.just("")
}
}
.flatMap { api.publish(id!!, null) }
What can be a reason? I see in debugger that RxActivityResult... starts, in deep of the library onNext method called, but it is not continued to next .flatMap
Thank you for answers
.flatMap(object: Function<RequiresCaptchaResponse, Observable<String>> {
override fun apply(it: RequiresCaptchaResponse): Observable<String> {
val subject = PublishSubject.create<String>()
if (it.requiresCaptcha) {
RxActivityResult.on(activity)
.startIntent(Intent(activity, Recaptcha2Activity::class.java))
.map { it.data().getStringExtra(Recaptcha2Activity.KEY_CAPTCHA_HASH)?:"" }
.subscribe({ subject.onNext(it) })
return subject
} else {
return Observable.just("")
}
}
})
.flatMap { api.publishSendIt(id!!, it) }
Hello, I have a question about the operators implementation,
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
actual.onSubscribe(this);
}
}
I understand that this line DisposableHelper.validate(this.s, s)
is to assert that the value from upstream isn't null and it wasn't set before, but why in this line actual.onSubscribe(this);
we passed this
? why not just passing s
? what is the need for implementing Disposable ?
flatMap
has overloads with the maxConcurrency
parameter. Just set that to 4 and use the flatMap
as normal.
Hello guys! i have a problem with rxjava.
I have a list of image url and i need to convert it to jpeg, so i tried
Observable.fromIterable(entries)
.concatMap(entry->Observable.just(ImageUtils.convertFile(entry.getPath())))
.subscribeOn(Schedulers.io())
in ImageUtils.convertFile, i decode bitmap then convert it to jpeg.
It worked well on my Galaxy S7 Edge :tada:, but it throw out of memory exception in BitmapFactory.decodeFile
(ImageUtils) :( on low-end devices.
At first, i though it's because BitmapFactory.decodeFile use too much memory, but then i realize it can be error of Schedulers.io()
.
Do you have any advice to solve this problem? Thank you so much:D
Observable<String> ob = Observable.<String>create(t -> t.onNext("1")).
concatWith(Observable.create(t -> t.onNext("2")));
ob.subscribe(System.out::println);
. For some reason it only prints 1
. Any idea why ?
@quanlt Taking into account that you are working with bitmaps, I'd say that Schedulers#io
has nothing to do with OOM in this case
code that you provided shouldn't process few bitmaps in parallel so you'd rather check your ImageUtils#convertFile
@tabiulObservable#concatWith
will subscribe to Observable
that passed as its param only after parent Observable
:
Observable.<String>create(t -> t.onNext("1"))
completes, but in our case it won't complete ever