RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Observable<Data> signal1 = hotSignal.filter(data -> data.id == 1).startWith(emptyData);
Observable<Data> signal2 = hotSignal.filter(data -> data.id == 2).startWith(emptyData);
Observable<Data> signal3 = hotSignal.filter(data -> data.id == 3).startWith(emptyData);
Observable<String> latestData = Observable.combineLatest(
signal1, signal2, signal3,
(data1, data2, data3) -> mergeData(data1, data2, data3)
);
latestData.sample(1, TimeUnit.SECONDS).subscribe(mergedData -> System.out.println(mergedData));
Observable first = Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
subscriber.add(Subscriptions.create(() -> {
log(“first unsubscribe”);
}));
}
}).doOnUnsubscribe(() -> log(“first”));
Observable second = Observable.create(…).doOnUnsubscribe(() -> log(“second”));
Observable result = first.mergeWith(second).doOnUnsubscribe(() -> loge(“result”));
Subscription subscription = result.subscribe(…);
//…
subscription.unsubscribe();
It logs only “result”. Looks like unsubscription is not propagated to merge’s child observables. So how to handle unsubscription inside of first observable’s Observable.OnSubscribe?
combineLatest
is a pretty natural approach
final Func0<HashMap<Integer, String>> initialState = () -> new HashMap<>();
final HashMap<Integer, String> output = new HashMap<>();
source.window(1, TimeUnit.SECONDS)
.flatMap(observable -> observable.collect(
initialState,
(mostRecent, data) -> mostRecent.put(data.id, data.content))
)
.map(input -> {
output.putAll(input);
return output;
}).subscribe(reduced -> System.out.println(output));
.map
is kind of kludgy, I'm pondering a better way to produce the output dictionary
scan
the original source instead of collect
, and sample the result.
reduced
== output
, so you can just used reduced
in subscribe.
reduced
is the same as output
to me, either. The output of each window is a Map. The final step is merging the map emitted by the current window with the map from the previous window.
scan
emit all intermediate values also. Not sure if that bode well if you are emitting the same mutable datatype each time.return output;
. If i understand that correctly that means that in .subscribe(reduced -> ...)
, reduced
is the output
variable returned by map.
@Dorus updated the previous gist to use scan
, thanks for the input:
source.scan(new HashMap<>(), (output, data) -> {
output.put(data.id, data.content);
return output;
}).sample(1, TimeUnit.SECONDS).subscribe(output -> System.out.println(output));
output.put
has side effects, and you are emitting the same collection every time, since sample
might allows the previous operators to run concurrent, output
might receive new put
calls during the onNext call in subscribe. Might be a good idea to use ConcurrentHashMap
or clone the collection somewhere.
Sharing is Cheap
If you are certain that an object will never change, then sharing said object becomes a simple matter of providing a reference to it. In Java to do so often requires a lot of defensive copying. Along this vein, because we can freely share references for immutable objects we can likewise intern them for free.