RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Observable<Data> signal1 = hotSignal.filter(data -> data.id == 1).startWith(emptyData);
Observable<Data> signal2 = hotSignal.filter(data -> data.id == 2).startWith(emptyData);
Observable<Data> signal3 = hotSignal.filter(data -> data.id == 3).startWith(emptyData);
Observable<String> latestData = Observable.combineLatest(
signal1, signal2, signal3,
(data1, data2, data3) -> mergeData(data1, data2, data3)
);
latestData.sample(1, TimeUnit.SECONDS).subscribe(mergedData -> System.out.println(mergedData));
Observable first = Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
subscriber.add(Subscriptions.create(() -> {
log(“first unsubscribe”);
}));
}
}).doOnUnsubscribe(() -> log(“first”));
Observable second = Observable.create(…).doOnUnsubscribe(() -> log(“second”));
Observable result = first.mergeWith(second).doOnUnsubscribe(() -> loge(“result”));
Subscription subscription = result.subscribe(…);
//…
subscription.unsubscribe();
It logs only “result”. Looks like unsubscription is not propagated to merge’s child observables. So how to handle unsubscription inside of first observable’s Observable.OnSubscribe?
combineLatest
is a pretty natural approach
final Func0<HashMap<Integer, String>> initialState = () -> new HashMap<>();
final HashMap<Integer, String> output = new HashMap<>();
source.window(1, TimeUnit.SECONDS)
.flatMap(observable -> observable.collect(
initialState,
(mostRecent, data) -> mostRecent.put(data.id, data.content))
)
.map(input -> {
output.putAll(input);
return output;
}).subscribe(reduced -> System.out.println(output));
.map
is kind of kludgy, I'm pondering a better way to produce the output dictionary
scan
the original source instead of collect
, and sample the result.
reduced
== output
, so you can just used reduced
in subscribe.
reduced
is the same as output
to me, either. The output of each window is a Map. The final step is merging the map emitted by the current window with the map from the previous window.
scan
emit all intermediate values also. Not sure if that bode well if you are emitting the same mutable datatype each time.return output;
. If i understand that correctly that means that in .subscribe(reduced -> ...)
, reduced
is the output
variable returned by map.