RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
I'm guessing there is no way preexisting way to accomplish something like this right?
Observable<CustomEvent> events = listen().toCustomEvent();
events
.map(Event::cause)
.map(Cause::getStarter)
.filter(User::isModerator)
.subscribe(result -> {
//Here result is a user(Cause::getStarter), but what I would really
//like is to then do something like.
CustomEvent event = result;
event.fiddleAroundABit();
event.cancel();
});
I run into this kind of issue all the time and end up with very stringy filters.
What I would like is something sorta like this:
BiObservable<String, Integer> test;
test
.first().map(stringToDoubleMap::get)
//At this point we have BiObservable<Double, Integer>
.second().map(intToStringMap::get)
//Now we have BiObservable<Double, String>
.subscribe((first, second) ->
//Will emit when one from both sides is available.
});
This would make something like this relatively simple:
Observable<CustomEvent> events;
events
.splitToSecond(Event::cause)
//BiObservable<CustomEvent, Cause>
.second().map(Cause::getStarter)
//BiObservable<CustomEvent, User>
.second().filter(User::isModerator)
//If one side of the 'pair' gets removed by filter the other side does too.
.first().subscribe(event -> {
//Only care about the first side.
});
However I don't want to figure out how to make something like this possible until I know there is no 'standard' solution.
TimeUnit.SECONDS
stuff?
Hi
I have problem with RxJavaPlugins.onScheduleHandler
.
At the moment of changing thread pool is invoked
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
my Runnable is wrapped into DisposeTask so I can't check type of action inside scheduleHandler because DisposeTask is package scope.
Do you have any idea how to solve this problem without reflection? :)
Have anybody resolved auto removal of the completed subscriptions from the CompositeSubscription task to release memory from unused stuff? The only thing that comes to my mind is something like this:
public static <T> void subscribeWithAutoRemove(Observable<T> observable, Action1<T> action, CompositeSubscription compositeSubscription) {
MutableObject<Subscription> subscriptionHolder = new MutableObject<>();
subscriptionHolder.set(observable
.doOnCompleted(() -> {
if (subscriptionHolder.isSet()) {
compositeSubscription.remove(subscriptionHolder.get());
}
})
.subscribe(action));
if (!subscriptionHolder.get().isUnsubscribed()) {
compositeSubscription.add(subscriptionHolder.get());
}
}
Maybe there is a better way?