RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Thanks for the feedback, I'm using Java 8 and RxJava 2.1.0 and this is my output:
Without doOn* Operators*
Without doOn* Subscribed!
Without doOn* onNext: ONE
Without doOn* onNext: THREE
Without doOn* onNext: FOUR
Without doOn* Done!
With doOn* Operators
With doOn* Subscribed!
Event: ONE
With doOn* onNext: ONE
With doOn* error: Callable returned null
With RxJava 2.1.4 works fine!!!
Hi, I have a problem. Chain of calls stops, and I don't understand why.
RxActivityResult.on(activity)
.startIntent(Intent(activity, Recaptcha2Activity::class.java))
.map { it.data().getStringExtra(Recaptcha2Activity.KEY_CAPTCHA_HASH)?:"" }
.subscribe({ }, { })
this code works, I am getting "onNext" method called. But when I am putting it into chain, next "flatMap" isn't called
.flatMap {
if(it.requiresCaptcha) {
RxActivityResult.on(activity)
.startIntent(Intent(activity, Recaptcha2Activity::class.java))
.map { it.data().getStringExtra(Recaptcha2Activity.KEY_CAPTCHA_HASH)?:"" }
} else {
Observable.just("")
}
}
.flatMap { api.publish(id!!, null) }
What can be a reason? I see in debugger that RxActivityResult... starts, in deep of the library onNext method called, but it is not continued to next .flatMap
Thank you for answers
.flatMap(object: Function<RequiresCaptchaResponse, Observable<String>> {
override fun apply(it: RequiresCaptchaResponse): Observable<String> {
val subject = PublishSubject.create<String>()
if (it.requiresCaptcha) {
RxActivityResult.on(activity)
.startIntent(Intent(activity, Recaptcha2Activity::class.java))
.map { it.data().getStringExtra(Recaptcha2Activity.KEY_CAPTCHA_HASH)?:"" }
.subscribe({ subject.onNext(it) })
return subject
} else {
return Observable.just("")
}
}
})
.flatMap { api.publishSendIt(id!!, it) }
Hello, I have a question about the operators implementation,
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
actual.onSubscribe(this);
}
}
I understand that this line DisposableHelper.validate(this.s, s)
is to assert that the value from upstream isn't null and it wasn't set before, but why in this line actual.onSubscribe(this);
we passed this
? why not just passing s
? what is the need for implementing Disposable ?
flatMap
has overloads with the maxConcurrency
parameter. Just set that to 4 and use the flatMap
as normal.
Hello guys! i have a problem with rxjava.
I have a list of image url and i need to convert it to jpeg, so i tried
Observable.fromIterable(entries)
.concatMap(entry->Observable.just(ImageUtils.convertFile(entry.getPath())))
.subscribeOn(Schedulers.io())
in ImageUtils.convertFile, i decode bitmap then convert it to jpeg.
It worked well on my Galaxy S7 Edge :tada:, but it throw out of memory exception in BitmapFactory.decodeFile
(ImageUtils) :( on low-end devices.
At first, i though it's because BitmapFactory.decodeFile use too much memory, but then i realize it can be error of Schedulers.io()
.
Do you have any advice to solve this problem? Thank you so much:D
Observable<String> ob = Observable.<String>create(t -> t.onNext("1")).
concatWith(Observable.create(t -> t.onNext("2")));
ob.subscribe(System.out::println);
. For some reason it only prints 1
. Any idea why ?
@quanlt Taking into account that you are working with bitmaps, I'd say that Schedulers#io
has nothing to do with OOM in this case
code that you provided shouldn't process few bitmaps in parallel so you'd rather check your ImageUtils#convertFile
@tabiulObservable#concatWith
will subscribe to Observable
that passed as its param only after parent Observable
:
Observable.<String>create(t -> t.onNext("1"))
completes, but in our case it won't complete ever
public static File copyFile(PhotoEntry entry, final Context context) throws IOException {
File file = new File(entry.getPath());
InputStream in = new FileInputStream(file);
File outputDir = new File(context.getFilesDir() + IMAGE_TMP_FOLDER);
if (!outputDir.exists()) {
outputDir.mkdir();
}
File outputFile = new File(context.getFilesDir() + IMAGE_TMP_FOLDER + "/" + entry.getName() + JPEG_EXTENSION);
try {
Bitmap bitmap = BitmapFactory.decodeFile(entry.getPath());
OutputStream out = new FileOutputStream(outputFile);
bitmap.compress(Bitmap.CompressFormat.JPEG, 100, out);
out.flush();
out.close();
ImageUtils.copyExif(entry.getPath(), outputFile.getPath());
bitmap.recycle();
} finally {
in.close();
}
return outputFile;
}
Schedulers.io