Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    David Stemmer
    @weefbellington
    probably this would be better:
    public class LookbackTransformer<T, R> implements Observable.Transformer<T, R> {
    
        private final Func2<T, T, R> mergeFunc;
    
        public LookbackTransformer(Func2<T, T, R> mergeFunc) {
            this.mergeFunc = mergeFunc;
        }
    
        @Override
        public Observable<R> call(Observable<T> source) {
            return source.window(2, 1).flatMap(pair -> {
                Observable<T> left = pair.elementAt(1);
                Observable<T> right = pair.elementAt(2);
                return left.zipWith(right, mergeFunc);
            }
        };
    }
    that way, if there were only one element in source, right would never resolve and the final merge would never occur
    dammit, that wouldn't work either, elementAt throws an IndexOutOfBoundsException :D
    David Stemmer
    @weefbellington
    last try:
    public class LookbackTransformer<T, R> implements Observable.Transformer<T, R> {
    
        private final Func2<T, T, R> mergeFunc;
    
        public LookbackTransformer(Func2<T, T, R> mergeFunc) {
            this.mergeFunc = mergeFunc;
        }
    
        @Override
        public Observable<R> call(Observable<T> source) {
            return source.window(2, 1).flatMap(pair -> {
                try {
                    Observable<T> left = pair.elementAt(1);
                    Observable<T> right = pair.elementAt(2);
                    return left.zipWith(right, mergeFunc);
                } catch (IndexOutOfBoundsException e) {
                    return Observable.never();
                }
            }
        };
    not very elegant
    and I'm not sure it would work
    Dorus
    @Dorus
    Looks right, and since onNext is probably called more often than onComplete, this is more efficient than checking the size of pair. Still, window didn't eliminate the special case for the last element as hoped.
    Dorus
    @Dorus
    Not even sure if you really need window here. Might as well go for
    left = publish source; right = left.skip(1); result = left.zipWith(right, mergFunc); left.connect(); return result;(in speudo code). Should probably use observable.create to be able to track the disposable from connect() there.
    Dorus
    @Dorus
    Mm, i got it this far, but now i cant figure out how to connect the subscription to the observable.create like you would do in C#.
    @Override
    public Observable<R> call(Observable<T> source) {
        return Observable.create(new Observable.OnSubscribe<R>() {
        @Override
        public void call(Subscriber<? super R> observer) {
            ConnectableObservable<T> left = source.publish();
            Observable<T> right = left.skip(1);
            Subscription sub = new CompositeSubscription(
                    left.zipWith(right, mergeFunc).subscribe(observer), 
                    left.connect());
            // return sub??
        }
     }) ;
    }
    David Stemmer
    @weefbellington
    I really don't think the publish subject is necessary
    Dorus
    @Dorus
    If you do something like source.select(slowFunction()).lookback(...), do you want slowFunction() to run twice per element?
    David Stemmer
    @weefbellington
    you're right, I think you can use share()
    which is an alias for publish().refcount()
    iirc
    Dorus
    @Dorus
    *mean map
    yes share sounds perfect
    are we sure it is impossible for elements to emit during subscription?
    David Stemmer
    @weefbellington
    I'm not sure what you mean, originally I was using a Transformer so I wasn't using an onSubscribe function
    Dorus
    @Dorus
    you can still use transformer for this
    i mean, is it possibel for the source to emit a element between source.share() and left.skip(1)
    because that would be a problem
    David Stemmer
    @weefbellington
    a transformer constructs a new observable before subscription occurs, right?
    David Stemmer
    @weefbellington
    in my understanding, a Transformer takes a a source Observable<T> and transforms it into an output Observable<R>, at the time that the Observable is created. So overlapping emissions shouldn't be a problem.
    Dorus
    @Dorus
    Yeah probably, but i would prefer to be 100% sure. Dont want things like subscribeOn have a effect here.
    David Stemmer
    @weefbellington
    FYI, the original LookbackTranformer I posted doesn't duplicate mapped functions on the source observable:
    public class LookbackTransformer<T, R> implements Observable.Transformer<T, R> {
    
        private final Func2<T, T, R> mergeFunc;
    
        public LookbackTransformer(Func2<T, T, R> mergeFunc) {
            this.mergeFunc = mergeFunc;
        }
    
        @Override
        public Observable<R> call(Observable<T> source) {
            Observable<T> offsetSource = source.skip(1);
            return source.zipWith(offsetSource, mergeFunc);
        }
    }
    I just tested it and all mapped functions prior to the compose(lookbackTransformer) are getting called once
    Dorus
    @Dorus
    guess you got it right the first time then :D
    David Stemmer
    @weefbellington
    I can't really explain why though
    I'm looking at the internals of the Zip operator
    wow, it's pretty hairy in there, I think I'd need to step through with the debugger to figure out what's going on :)
    today is not that day though
    Dorus
    @Dorus
    yeah i had the same thought when i looked at the C# source once :P
    Dorus
    @Dorus
    ok, this is really wrong. I tried your code with transform, and i dont like the result.
    The output looks like this:
    do 0
    do 0
    do 1
    do 1
    sb 1
    do 2
    sb 2
    sb 0 is missing and the do is seen twice per element.
    David Stemmer
    @weefbellington
    This message was deleted
    let me try it
    Dorus
    @Dorus
    If i add source.share() it only calls do once
    Still missing sb 0
    Oh right, there is no sb 0, as that's (0,1) -> 1
    David Stemmer
    @weefbellington
    yeah, it waits for the second emission until doing the zip function
    Dorus
    @Dorus
    yup
    David Stemmer
    @weefbellington
    I think I was running into a problem with share though where skip() was taking effect on both observables
    Dorus
    @Dorus
    why would skip effect on both?
    @Override
    public Observable<R> call(Observable<T> source) {
        Observable<T> source2 = source.share();
        Observable<T> offsetSource = source2.skip(1);
        return source2.zipWith(offsetSource, mergeFunc);
    }
    David Stemmer
    @weefbellington
    it really shouldn't, but can you try printing both a and b?