Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Dorus
    @Dorus
    i mean, is it possibel for the source to emit a element between source.share() and left.skip(1)
    because that would be a problem
    David Stemmer
    @weefbellington
    a transformer constructs a new observable before subscription occurs, right?
    David Stemmer
    @weefbellington
    in my understanding, a Transformer takes a a source Observable<T> and transforms it into an output Observable<R>, at the time that the Observable is created. So overlapping emissions shouldn't be a problem.
    Dorus
    @Dorus
    Yeah probably, but i would prefer to be 100% sure. Dont want things like subscribeOn have a effect here.
    David Stemmer
    @weefbellington
    FYI, the original LookbackTranformer I posted doesn't duplicate mapped functions on the source observable:
    public class LookbackTransformer<T, R> implements Observable.Transformer<T, R> {
    
        private final Func2<T, T, R> mergeFunc;
    
        public LookbackTransformer(Func2<T, T, R> mergeFunc) {
            this.mergeFunc = mergeFunc;
        }
    
        @Override
        public Observable<R> call(Observable<T> source) {
            Observable<T> offsetSource = source.skip(1);
            return source.zipWith(offsetSource, mergeFunc);
        }
    }
    I just tested it and all mapped functions prior to the compose(lookbackTransformer) are getting called once
    Dorus
    @Dorus
    guess you got it right the first time then :D
    David Stemmer
    @weefbellington
    I can't really explain why though
    I'm looking at the internals of the Zip operator
    wow, it's pretty hairy in there, I think I'd need to step through with the debugger to figure out what's going on :)
    today is not that day though
    Dorus
    @Dorus
    yeah i had the same thought when i looked at the C# source once :P
    Dorus
    @Dorus
    ok, this is really wrong. I tried your code with transform, and i dont like the result.
    The output looks like this:
    do 0
    do 0
    do 1
    do 1
    sb 1
    do 2
    sb 2
    sb 0 is missing and the do is seen twice per element.
    David Stemmer
    @weefbellington
    This message was deleted
    let me try it
    Dorus
    @Dorus
    If i add source.share() it only calls do once
    Still missing sb 0
    Oh right, there is no sb 0, as that's (0,1) -> 1
    David Stemmer
    @weefbellington
    yeah, it waits for the second emission until doing the zip function
    Dorus
    @Dorus
    yup
    David Stemmer
    @weefbellington
    I think I was running into a problem with share though where skip() was taking effect on both observables
    Dorus
    @Dorus
    why would skip effect on both?
    @Override
    public Observable<R> call(Observable<T> source) {
        Observable<T> source2 = source.share();
        Observable<T> offsetSource = source2.skip(1);
        return source2.zipWith(offsetSource, mergeFunc);
    }
    David Stemmer
    @weefbellington
    it really shouldn't, but can you try printing both a and b?
    Dorus
    @Dorus
    yeah i already did. Didn't update the gist
    updated
    David Stemmer
    @weefbellington
    I'm running on Android right now and I don't have an environment set up where I can test Rx functions quickly
    I need to build a regular Java project
    Dorus
    @Dorus
    I just have eclipse open here
    do 0
    do 1
    sb 0 1
    do 2
    sb 1 2
    do 3
    sb 2 3
    Thats what i see with share()
    David Stemmer
    @weefbellington
    that seems right, I'm not sure why I wasn't seeing the double emissions in practice
    Dorus
    @Dorus
    I tried to put a Thread.sleep(100); between the lines in call and it doesn't give any problems. Guess that part is synchronized somewhere.
    David Stemmer
    @weefbellington
    thanks for catching that. I really need to set up a minimal Java sandbox environment so I can test this stuff more easily.
    Dorus
    @Dorus
    yeah, it takes some time to figure out all things that are happening within Rx. Some code runs once on creation, some for every subscription, some for every element. If you duplicate your subscription somewhere, you run it twice for every event.
    David Stemmer
    @weefbellington
    I think I see what was happening in my case. The source observable had already been multicasted with share()
    Dorus
    @Dorus
    Yup, that hides the culprit.
    David Stemmer
    @weefbellington
    Thanks again! shame on me for not isolating the component under test
    Dorus
    @Dorus
    Mm, bit annoying that the share in the transformer now effect separate subscriptions also. I wonder if that's avoidable.
    Perhaps with observable.create?
    David Stemmer
    @weefbellington
    it sounds like you're describing something like a defensive copy
    Dorus
    @Dorus
    @Override
    public Observable<R> call(Observable<T> source2) {
        return Observable.create(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> t) {
                Observable<T> source = source2.share();
                Observable<T> offsetSource = source.skip(1);
                source.zipWith(offsetSource, mergeFunc).subscribe(t);
            }
        });
    }
    That works
    Feels a little funky
    David Stemmer
    @weefbellington
    I don't see anything easier, it looks like toObservable just hides the type, it doesn't create a copy that forwards the results to another subscriber
    Dorus
    @Dorus
    I'm trying to write it using Lambdas, looks quite a bit cleaner.