Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Steve Gury
    @stevegury
    @SerialDev this tutorial https://github.com/Froussios/Intro-To-RxJava is pretty good
    Andres Mariscal
    @SerialDev
    @stevegury thank you :)
    David Stemmer
    @weefbellington
    I made a Transformer to be able to do "lookback" without a behavior subject:
    public class LookbackTransformer<T, R> implements Observable.Transformer<T, R> {
    
        private final Func2<T, T, R> mergeFunc;
    
        public LookbackTransformer(Func2<T, T, R> mergeFunc) {
            this.mergeFunc = mergeFunc;
        }
    
        @Override
        public Observable<R> call(Observable<T> source) {
            Observable<T> offsetSource = source.skip(1);
            return source.zipWith(offsetSource, mergeFunc);
        }
    }
    for example it allows you to take a stream returned by interval which emits [1, 2, 3, 4, 5] and map it to [(1,2),(2,3),(3,4),(4,5)]
    is this a wise think to do or could there be unintended consequences?
    Dorus
    @Dorus
    This is different from Buffer(2,1) ?
    David Stemmer
    @weefbellington
    @Dorus wouldn't buffer yield something like [(1,2),(3,4),(5...)]?
    Dorus
    @Dorus
    not if you use skip 1 instead of skip 2
    David Stemmer
    @weefbellington
    ah interesting. I didn't know you could use it to create overlapping windows, neat
    Dorus
    @Dorus
    It is neat!
    Works both on window() and buffer()
    David Stemmer
    @weefbellington
    the only downside would be you'd have to flatmap it but that's nbd
    Dorus
    @Dorus
    If i had to implement your Lookbacktransformer i would do something like Lookback(Observable<T> source Func2<T, T, R> merg) { return source.buffer(2,1).map(e => merg(e[0], e[1]));}
    (Code is probably all wrong, not writing a whole lot of java lately)
    David Stemmer
    @weefbellington
    yeah that sounds about right to me
    Dorus
    @Dorus
    Actually, small problem is buffer gives a single element array before onComplete. Need to filter those out else you get a ArrayIndexOutOfBoundsException.
    David Stemmer
    @weefbellington
    you can just use flatmap with window
    Dorus
    @Dorus
    Mm actually, show me, i haven't been able to produce that yet.
    David Stemmer
    @weefbellington
    I'll have to do it later, I'm about to leave
    Dorus
    @Dorus
    no problem, leaving also :)
    David Stemmer
    @weefbellington
    is there anything wrong with zipping an observable with itself though? I thought it was quite clever
    Dorus
    @Dorus
    not really, but if subscribing twice has side effects, you could consider to publish it first
    David Stemmer
    @weefbellington
    well you're not subscribing in the transformer
    Dorus
    @Dorus
    I believe zip subscribes to both sources
    Or does the transformer handle that? Again, something i'm not familiar with
    David Stemmer
    @weefbellington
    @Dorus for the sake of argument, I think this would be how you would do the LookbackTransformer with window instead of buffer:
    public class LookbackTransformer<T, R> implements Observable.Transformer<T, R> {
    
        private final Func2<T, T, R> mergeFunc;
    
        public LookbackTransformer(Func2<T, T, R> mergeFunc) {
            this.mergeFunc = mergeFunc;
        }
    
        @Override
        public Observable<R> call(Observable<T> source) {
            return source.window(2, 1).flatMap(pair -> {
                Observable<T> left = pair.first();
                Observable<T> right = pair.last();
                return left.zipWith(right, mergeFunc);
            }
        }
    }
    well, actually, this still has the single-emission problem
    David Stemmer
    @weefbellington
    probably this would be better:
    public class LookbackTransformer<T, R> implements Observable.Transformer<T, R> {
    
        private final Func2<T, T, R> mergeFunc;
    
        public LookbackTransformer(Func2<T, T, R> mergeFunc) {
            this.mergeFunc = mergeFunc;
        }
    
        @Override
        public Observable<R> call(Observable<T> source) {
            return source.window(2, 1).flatMap(pair -> {
                Observable<T> left = pair.elementAt(1);
                Observable<T> right = pair.elementAt(2);
                return left.zipWith(right, mergeFunc);
            }
        };
    }
    that way, if there were only one element in source, right would never resolve and the final merge would never occur
    dammit, that wouldn't work either, elementAt throws an IndexOutOfBoundsException :D
    David Stemmer
    @weefbellington
    last try:
    public class LookbackTransformer<T, R> implements Observable.Transformer<T, R> {
    
        private final Func2<T, T, R> mergeFunc;
    
        public LookbackTransformer(Func2<T, T, R> mergeFunc) {
            this.mergeFunc = mergeFunc;
        }
    
        @Override
        public Observable<R> call(Observable<T> source) {
            return source.window(2, 1).flatMap(pair -> {
                try {
                    Observable<T> left = pair.elementAt(1);
                    Observable<T> right = pair.elementAt(2);
                    return left.zipWith(right, mergeFunc);
                } catch (IndexOutOfBoundsException e) {
                    return Observable.never();
                }
            }
        };
    not very elegant
    and I'm not sure it would work
    Dorus
    @Dorus
    Looks right, and since onNext is probably called more often than onComplete, this is more efficient than checking the size of pair. Still, window didn't eliminate the special case for the last element as hoped.
    Dorus
    @Dorus
    Not even sure if you really need window here. Might as well go for
    left = publish source; right = left.skip(1); result = left.zipWith(right, mergFunc); left.connect(); return result;(in speudo code). Should probably use observable.create to be able to track the disposable from connect() there.
    Dorus
    @Dorus
    Mm, i got it this far, but now i cant figure out how to connect the subscription to the observable.create like you would do in C#.
    @Override
    public Observable<R> call(Observable<T> source) {
        return Observable.create(new Observable.OnSubscribe<R>() {
        @Override
        public void call(Subscriber<? super R> observer) {
            ConnectableObservable<T> left = source.publish();
            Observable<T> right = left.skip(1);
            Subscription sub = new CompositeSubscription(
                    left.zipWith(right, mergeFunc).subscribe(observer), 
                    left.connect());
            // return sub??
        }
     }) ;
    }
    David Stemmer
    @weefbellington
    I really don't think the publish subject is necessary
    Dorus
    @Dorus
    If you do something like source.select(slowFunction()).lookback(...), do you want slowFunction() to run twice per element?
    David Stemmer
    @weefbellington
    you're right, I think you can use share()
    which is an alias for publish().refcount()
    iirc
    Dorus
    @Dorus
    *mean map
    yes share sounds perfect
    are we sure it is impossible for elements to emit during subscription?
    David Stemmer
    @weefbellington
    I'm not sure what you mean, originally I was using a Transformer so I wasn't using an onSubscribe function
    Dorus
    @Dorus
    you can still use transformer for this