RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
flatMap
Observable<Data> signal1 = hotSignal.filter(data -> data.id == 1).startWith(emptyData);
Observable<Data> signal2 = hotSignal.filter(data -> data.id == 2).startWith(emptyData);
Observable<Data> signal3 = hotSignal.filter(data -> data.id == 3).startWith(emptyData);
Observable<String> latestData = Observable.combineLatest(
signal1, signal2, signal3,
(data1, data2, data3) -> mergeData(data1, data2, data3)
);
latestData.sample(1, TimeUnit.SECONDS).subscribe(mergedData -> System.out.println(mergedData));
Observable first = Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
subscriber.add(Subscriptions.create(() -> {
log(“first unsubscribe”);
}));
}
}).doOnUnsubscribe(() -> log(“first”));
Observable second = Observable.create(…).doOnUnsubscribe(() -> log(“second”));
Observable result = first.mergeWith(second).doOnUnsubscribe(() -> loge(“result”));
Subscription subscription = result.subscribe(…);
//…
subscription.unsubscribe();
It logs only “result”. Looks like unsubscription is not propagated to merge’s child observables. So how to handle unsubscription inside of first observable’s Observable.OnSubscribe?
combineLatest
is a pretty natural approach
final Func0<HashMap<Integer, String>> initialState = () -> new HashMap<>();
final HashMap<Integer, String> output = new HashMap<>();
source.window(1, TimeUnit.SECONDS)
.flatMap(observable -> observable.collect(
initialState,
(mostRecent, data) -> mostRecent.put(data.id, data.content))
)
.map(input -> {
output.putAll(input);
return output;
}).subscribe(reduced -> System.out.println(output));
.map
is kind of kludgy, I'm pondering a better way to produce the output dictionary