RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
sb 0
sb 0
, as that's (0,1) -> 1
@Override
public Observable<R> call(Observable<T> source) {
Observable<T> source2 = source.share();
Observable<T> offsetSource = source2.skip(1);
return source2.zipWith(offsetSource, mergeFunc);
}
@Override
public Observable<R> call(Observable<T> source2) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> t) {
Observable<T> source = source2.share();
Observable<T> offsetSource = source.skip(1);
source.zipWith(offsetSource, mergeFunc).subscribe(t);
}
});
}
static <T, R> Transformer<T, R> lookback(Func2<T, T, R> mergeFunc) {
return source -> Observable.create(result -> {
Observable<T> left = source.share();
Observable<T> right = left.skip(1);
left.zipWith(right, mergeFunc).subscribe(result);
});
}