RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Hello guys! i have a problem with rxjava.
I have a list of image url and i need to convert it to jpeg, so i tried
Observable.fromIterable(entries)
.concatMap(entry->Observable.just(ImageUtils.convertFile(entry.getPath())))
.subscribeOn(Schedulers.io())
in ImageUtils.convertFile, i decode bitmap then convert it to jpeg.
It worked well on my Galaxy S7 Edge :tada:, but it throw out of memory exception in BitmapFactory.decodeFile
(ImageUtils) :( on low-end devices.
At first, i though it's because BitmapFactory.decodeFile use too much memory, but then i realize it can be error of Schedulers.io()
.
Do you have any advice to solve this problem? Thank you so much:D
Observable<String> ob = Observable.<String>create(t -> t.onNext("1")).
concatWith(Observable.create(t -> t.onNext("2")));
ob.subscribe(System.out::println);
. For some reason it only prints 1
. Any idea why ?
@quanlt Taking into account that you are working with bitmaps, I'd say that Schedulers#io
has nothing to do with OOM in this case
code that you provided shouldn't process few bitmaps in parallel so you'd rather check your ImageUtils#convertFile
@tabiulObservable#concatWith
will subscribe to Observable
that passed as its param only after parent Observable
:
Observable.<String>create(t -> t.onNext("1"))
completes, but in our case it won't complete ever
public static File copyFile(PhotoEntry entry, final Context context) throws IOException {
File file = new File(entry.getPath());
InputStream in = new FileInputStream(file);
File outputDir = new File(context.getFilesDir() + IMAGE_TMP_FOLDER);
if (!outputDir.exists()) {
outputDir.mkdir();
}
File outputFile = new File(context.getFilesDir() + IMAGE_TMP_FOLDER + "/" + entry.getName() + JPEG_EXTENSION);
try {
Bitmap bitmap = BitmapFactory.decodeFile(entry.getPath());
OutputStream out = new FileOutputStream(outputFile);
bitmap.compress(Bitmap.CompressFormat.JPEG, 100, out);
out.flush();
out.close();
ImageUtils.copyExif(entry.getPath(), outputFile.getPath());
bitmap.recycle();
} finally {
in.close();
}
return outputFile;
}
Schedulers.io
HystrixCommand.toObservable.concatMap{ // read the page information and if there is further page then return new HystrixCommand Observable}
. This is done recursively. The main problem with this if there are lot of pages then the memory will get filled up as there would be lot of observables to be executed and in reality the user might not need all of them at the same time. Is there a way to do this lazily like when a subscriber consumes then concatMap maybe +2 observables ? I know there is an api doOnNext
but that does not allow modifying the Observable
val src = org.raisercostin.jedi.Locations.file("""c:\data\work2\docs-proces\images1""")
val srcObs: Observable[Seq[FileAlterated]] =
src.watchFileCreated(1000).doOnEach(x=>println(s"got $x")).tumblingBuffer(Duration(1,TimeUnit.SECONDS)).filter(_.nonEmpty).doOnEach(event=>{
println(s"$event => ignore")
})
srcObs.toBlocking.last
retryWhen
on a merge
but it doesn't get a throwable back from merge.private Observable requestData(String token) {
return Observable.merge(new Observable[]{
mAssignmentRepository.sync(token),
mFacilityRepository.sync(token),
...
}
requestData(token)
.retryWhen(errors -> errors) <-- Only a object?
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.immediate())
.subscribe;
retryWhen
I'm planning to catch a 401 Unauthorized
and retry the call.
Hi everyone. How does one convert an Observable of one type to another type:
Observable<T> to Observable<R>
Are we supposed to use the to
operator or perhaps flatMap
?
When I try using flatmap it complains about observable cannot be applied to anonymous function<Observable<T>, Observable<R>>
(T -> R) -> R
(T -> Observable<R>) -> R
(I hope I wrote that right)
I'm guessing there is no way preexisting way to accomplish something like this right?
Observable<CustomEvent> events = listen().toCustomEvent();
events
.map(Event::cause)
.map(Cause::getStarter)
.filter(User::isModerator)
.subscribe(result -> {
//Here result is a user(Cause::getStarter), but what I would really
//like is to then do something like.
CustomEvent event = result;
event.fiddleAroundABit();
event.cancel();
});
I run into this kind of issue all the time and end up with very stringy filters.
What I would like is something sorta like this:
BiObservable<String, Integer> test;
test
.first().map(stringToDoubleMap::get)
//At this point we have BiObservable<Double, Integer>
.second().map(intToStringMap::get)
//Now we have BiObservable<Double, String>
.subscribe((first, second) ->
//Will emit when one from both sides is available.
});
This would make something like this relatively simple:
Observable<CustomEvent> events;
events
.splitToSecond(Event::cause)
//BiObservable<CustomEvent, Cause>
.second().map(Cause::getStarter)
//BiObservable<CustomEvent, User>
.second().filter(User::isModerator)
//If one side of the 'pair' gets removed by filter the other side does too.
.first().subscribe(event -> {
//Only care about the first side.
});
However I don't want to figure out how to make something like this possible until I know there is no 'standard' solution.