RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
public class LookbackTransformer<T, R> implements Observable.Transformer<T, R> {
private final Func2<T, T, R> mergeFunc;
public LookbackTransformer(Func2<T, T, R> mergeFunc) {
this.mergeFunc = mergeFunc;
}
@Override
public Observable<R> call(Observable<T> source) {
return source.window(2, 1).flatMap(pair -> {
Observable<T> left = pair.elementAt(1);
Observable<T> right = pair.elementAt(2);
return left.zipWith(right, mergeFunc);
}
};
}
source
, right
would never resolve and the final merge would never occur
elementAt
throws an IndexOutOfBoundsException
:D
public class LookbackTransformer<T, R> implements Observable.Transformer<T, R> {
private final Func2<T, T, R> mergeFunc;
public LookbackTransformer(Func2<T, T, R> mergeFunc) {
this.mergeFunc = mergeFunc;
}
@Override
public Observable<R> call(Observable<T> source) {
return source.window(2, 1).flatMap(pair -> {
try {
Observable<T> left = pair.elementAt(1);
Observable<T> right = pair.elementAt(2);
return left.zipWith(right, mergeFunc);
} catch (IndexOutOfBoundsException e) {
return Observable.never();
}
}
};
left = publish source; right = left.skip(1); result = left.zipWith(right, mergFunc); left.connect(); return result;
(in speudo code). Should probably use observable.create to be able to track the disposable from connect() there.
@Override
public Observable<R> call(Observable<T> source) {
return Observable.create(new Observable.OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> observer) {
ConnectableObservable<T> left = source.publish();
Observable<T> right = left.skip(1);
Subscription sub = new CompositeSubscription(
left.zipWith(right, mergeFunc).subscribe(observer),
left.connect());
// return sub??
}
}) ;
}
publish().refcount()
source.share()
and left.skip(1)
public class LookbackTransformer<T, R> implements Observable.Transformer<T, R> {
private final Func2<T, T, R> mergeFunc;
public LookbackTransformer(Func2<T, T, R> mergeFunc) {
this.mergeFunc = mergeFunc;
}
@Override
public Observable<R> call(Observable<T> source) {
Observable<T> offsetSource = source.skip(1);
return source.zipWith(offsetSource, mergeFunc);
}
}
compose(lookbackTransformer)
are getting called once
sb 0
is missing and the do
is seen twice per element.
sb 0
sb 0
, as that's (0,1) -> 1
@Override
public Observable<R> call(Observable<T> source) {
Observable<T> source2 = source.share();
Observable<T> offsetSource = source2.skip(1);
return source2.zipWith(offsetSource, mergeFunc);
}