Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Dorus
    @Dorus
    Still missing sb 0
    Oh right, there is no sb 0, as that's (0,1) -> 1
    David Stemmer
    @weefbellington
    yeah, it waits for the second emission until doing the zip function
    Dorus
    @Dorus
    yup
    David Stemmer
    @weefbellington
    I think I was running into a problem with share though where skip() was taking effect on both observables
    Dorus
    @Dorus
    why would skip effect on both?
    @Override
    public Observable<R> call(Observable<T> source) {
        Observable<T> source2 = source.share();
        Observable<T> offsetSource = source2.skip(1);
        return source2.zipWith(offsetSource, mergeFunc);
    }
    David Stemmer
    @weefbellington
    it really shouldn't, but can you try printing both a and b?
    Dorus
    @Dorus
    yeah i already did. Didn't update the gist
    updated
    David Stemmer
    @weefbellington
    I'm running on Android right now and I don't have an environment set up where I can test Rx functions quickly
    I need to build a regular Java project
    Dorus
    @Dorus
    I just have eclipse open here
    do 0
    do 1
    sb 0 1
    do 2
    sb 1 2
    do 3
    sb 2 3
    Thats what i see with share()
    David Stemmer
    @weefbellington
    that seems right, I'm not sure why I wasn't seeing the double emissions in practice
    Dorus
    @Dorus
    I tried to put a Thread.sleep(100); between the lines in call and it doesn't give any problems. Guess that part is synchronized somewhere.
    David Stemmer
    @weefbellington
    thanks for catching that. I really need to set up a minimal Java sandbox environment so I can test this stuff more easily.
    Dorus
    @Dorus
    yeah, it takes some time to figure out all things that are happening within Rx. Some code runs once on creation, some for every subscription, some for every element. If you duplicate your subscription somewhere, you run it twice for every event.
    David Stemmer
    @weefbellington
    I think I see what was happening in my case. The source observable had already been multicasted with share()
    Dorus
    @Dorus
    Yup, that hides the culprit.
    David Stemmer
    @weefbellington
    Thanks again! shame on me for not isolating the component under test
    Dorus
    @Dorus
    Mm, bit annoying that the share in the transformer now effect separate subscriptions also. I wonder if that's avoidable.
    Perhaps with observable.create?
    David Stemmer
    @weefbellington
    it sounds like you're describing something like a defensive copy
    Dorus
    @Dorus
    @Override
    public Observable<R> call(Observable<T> source2) {
        return Observable.create(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> t) {
                Observable<T> source = source2.share();
                Observable<T> offsetSource = source.skip(1);
                source.zipWith(offsetSource, mergeFunc).subscribe(t);
            }
        });
    }
    That works
    Feels a little funky
    David Stemmer
    @weefbellington
    I don't see anything easier, it looks like toObservable just hides the type, it doesn't create a copy that forwards the results to another subscriber
    Dorus
    @Dorus
    I'm trying to write it using Lambdas, looks quite a bit cleaner.
    static <T, R> Transformer<T, R> lookback(Func2<T, T, R> mergeFunc) {  
    return source -> Observable.create(result -> {
            Observable<T> left = source.share();
            Observable<T> right = left.skip(1);
            left.zipWith(right, mergeFunc).subscribe(result);
    });
    }
    David Stemmer
    @weefbellington
    good call on the not propagating the multicasting to downstream subscribers...I think it would be good to get a unit test around this
    if you're curious, what I'm using this for is to take a stream of video positions, [0, 60, 120, 150...] and turn it into a stream of video intervals, [ (0,60), (60, 120), (120, 150) ... ], and then log the start and end time of each interval
    Dorus
    @Dorus
    interesting usecase
    David Stemmer
    @weefbellington
    the stream can be started and stopped when the video is paused/played
    well, technically, the stream is completed on pause, and a new stream is created when playback begins again
    I really didn't want to do an awful stateful thing where I was caching the previous timestamp in a mapping function or something
    Dorus
    @Dorus
    you could use scan
    but then you need to make a tuple each time that keeps the last two values
    The mergfunction is much more elegant
    David Stemmer
    @weefbellington
    yeah I considered a scan/reduce with an accumulator
    but it felt weird
    stuffing a lot of state into an accumulator function is a code smell to me
    Dorus
    @Dorus
    yeah, and you also keep more state than you strictly need.
    indeed
    i want a scan that emit two values, one for the next iteration and one to send downstream
    you can wrap it in a tuple, but then you are constantly creating new tuples, as well as keeping the previous element until the next element arrives.
    David Stemmer
    @weefbellington
    I think accumulators that accumulate too much state should be called "katamari accumulators" :)
    Srepfler Srdan
    @schrepfler
    do people convert Twitter's Future to Observables? Are there utility methods for it?
    David Stemmer
    @weefbellington
    looking for suggestions: when you write a custom Observable, Operator or Transformer, where do they live?
    Simon Baslé
    @simonbasle
    in a project using Rx you mean?