RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
.flatMap(object: Function<RequiresCaptchaResponse, Observable<String>> {
override fun apply(it: RequiresCaptchaResponse): Observable<String> {
val subject = PublishSubject.create<String>()
if (it.requiresCaptcha) {
RxActivityResult.on(activity)
.startIntent(Intent(activity, Recaptcha2Activity::class.java))
.map { it.data().getStringExtra(Recaptcha2Activity.KEY_CAPTCHA_HASH)?:"" }
.subscribe({ subject.onNext(it) })
return subject
} else {
return Observable.just("")
}
}
})
.flatMap { api.publishSendIt(id!!, it) }
Hello, I have a question about the operators implementation,
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
actual.onSubscribe(this);
}
}
I understand that this line DisposableHelper.validate(this.s, s)
is to assert that the value from upstream isn't null and it wasn't set before, but why in this line actual.onSubscribe(this);
we passed this
? why not just passing s
? what is the need for implementing Disposable ?
flatMap
has overloads with the maxConcurrency
parameter. Just set that to 4 and use the flatMap
as normal.
Hello guys! i have a problem with rxjava.
I have a list of image url and i need to convert it to jpeg, so i tried
Observable.fromIterable(entries)
.concatMap(entry->Observable.just(ImageUtils.convertFile(entry.getPath())))
.subscribeOn(Schedulers.io())
in ImageUtils.convertFile, i decode bitmap then convert it to jpeg.
It worked well on my Galaxy S7 Edge :tada:, but it throw out of memory exception in BitmapFactory.decodeFile
(ImageUtils) :( on low-end devices.
At first, i though it's because BitmapFactory.decodeFile use too much memory, but then i realize it can be error of Schedulers.io()
.
Do you have any advice to solve this problem? Thank you so much:D
Observable<String> ob = Observable.<String>create(t -> t.onNext("1")).
concatWith(Observable.create(t -> t.onNext("2")));
ob.subscribe(System.out::println);
. For some reason it only prints 1
. Any idea why ?
@quanlt Taking into account that you are working with bitmaps, I'd say that Schedulers#io
has nothing to do with OOM in this case
code that you provided shouldn't process few bitmaps in parallel so you'd rather check your ImageUtils#convertFile
@tabiulObservable#concatWith
will subscribe to Observable
that passed as its param only after parent Observable
:
Observable.<String>create(t -> t.onNext("1"))
completes, but in our case it won't complete ever
public static File copyFile(PhotoEntry entry, final Context context) throws IOException {
File file = new File(entry.getPath());
InputStream in = new FileInputStream(file);
File outputDir = new File(context.getFilesDir() + IMAGE_TMP_FOLDER);
if (!outputDir.exists()) {
outputDir.mkdir();
}
File outputFile = new File(context.getFilesDir() + IMAGE_TMP_FOLDER + "/" + entry.getName() + JPEG_EXTENSION);
try {
Bitmap bitmap = BitmapFactory.decodeFile(entry.getPath());
OutputStream out = new FileOutputStream(outputFile);
bitmap.compress(Bitmap.CompressFormat.JPEG, 100, out);
out.flush();
out.close();
ImageUtils.copyExif(entry.getPath(), outputFile.getPath());
bitmap.recycle();
} finally {
in.close();
}
return outputFile;
}
Schedulers.io
HystrixCommand.toObservable.concatMap{ // read the page information and if there is further page then return new HystrixCommand Observable}
. This is done recursively. The main problem with this if there are lot of pages then the memory will get filled up as there would be lot of observables to be executed and in reality the user might not need all of them at the same time. Is there a way to do this lazily like when a subscriber consumes then concatMap maybe +2 observables ? I know there is an api doOnNext
but that does not allow modifying the Observable
val src = org.raisercostin.jedi.Locations.file("""c:\data\work2\docs-proces\images1""")
val srcObs: Observable[Seq[FileAlterated]] =
src.watchFileCreated(1000).doOnEach(x=>println(s"got $x")).tumblingBuffer(Duration(1,TimeUnit.SECONDS)).filter(_.nonEmpty).doOnEach(event=>{
println(s"$event => ignore")
})
srcObs.toBlocking.last
retryWhen
on a merge
but it doesn't get a throwable back from merge.private Observable requestData(String token) {
return Observable.merge(new Observable[]{
mAssignmentRepository.sync(token),
mFacilityRepository.sync(token),
...
}
requestData(token)
.retryWhen(errors -> errors) <-- Only a object?
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.immediate())
.subscribe;
retryWhen
I'm planning to catch a 401 Unauthorized
and retry the call.
Hi everyone. How does one convert an Observable of one type to another type:
Observable<T> to Observable<R>
Are we supposed to use the to
operator or perhaps flatMap
?
When I try using flatmap it complains about observable cannot be applied to anonymous function<Observable<T>, Observable<R>>