RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
val src = org.raisercostin.jedi.Locations.file("""c:\data\work2\docs-proces\images1""")
val srcObs: Observable[Seq[FileAlterated]] =
src.watchFileCreated(1000).doOnEach(x=>println(s"got $x")).tumblingBuffer(Duration(1,TimeUnit.SECONDS)).filter(_.nonEmpty).doOnEach(event=>{
println(s"$event => ignore")
})
srcObs.toBlocking.last
retryWhen
on a merge
but it doesn't get a throwable back from merge.private Observable requestData(String token) {
return Observable.merge(new Observable[]{
mAssignmentRepository.sync(token),
mFacilityRepository.sync(token),
...
}
requestData(token)
.retryWhen(errors -> errors) <-- Only a object?
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.immediate())
.subscribe;
retryWhen
I'm planning to catch a 401 Unauthorized
and retry the call.
Hi everyone. How does one convert an Observable of one type to another type:
Observable<T> to Observable<R>
Are we supposed to use the to
operator or perhaps flatMap
?
When I try using flatmap it complains about observable cannot be applied to anonymous function<Observable<T>, Observable<R>>
(T -> R) -> R
(T -> Observable<R>) -> R
(I hope I wrote that right)
I'm guessing there is no way preexisting way to accomplish something like this right?
Observable<CustomEvent> events = listen().toCustomEvent();
events
.map(Event::cause)
.map(Cause::getStarter)
.filter(User::isModerator)
.subscribe(result -> {
//Here result is a user(Cause::getStarter), but what I would really
//like is to then do something like.
CustomEvent event = result;
event.fiddleAroundABit();
event.cancel();
});
I run into this kind of issue all the time and end up with very stringy filters.
What I would like is something sorta like this:
BiObservable<String, Integer> test;
test
.first().map(stringToDoubleMap::get)
//At this point we have BiObservable<Double, Integer>
.second().map(intToStringMap::get)
//Now we have BiObservable<Double, String>
.subscribe((first, second) ->
//Will emit when one from both sides is available.
});
This would make something like this relatively simple:
Observable<CustomEvent> events;
events
.splitToSecond(Event::cause)
//BiObservable<CustomEvent, Cause>
.second().map(Cause::getStarter)
//BiObservable<CustomEvent, User>
.second().filter(User::isModerator)
//If one side of the 'pair' gets removed by filter the other side does too.
.first().subscribe(event -> {
//Only care about the first side.
});
However I don't want to figure out how to make something like this possible until I know there is no 'standard' solution.
TimeUnit.SECONDS
stuff?