These are chat archives for ReactiveX/RxJava

13th
Oct 2015
David Stemmer
@weefbellington
Oct 13 2015 14:14
@Dorus for the sake of argument, I think this would be how you would do the LookbackTransformer with window instead of buffer:
public class LookbackTransformer<T, R> implements Observable.Transformer<T, R> {

    private final Func2<T, T, R> mergeFunc;

    public LookbackTransformer(Func2<T, T, R> mergeFunc) {
        this.mergeFunc = mergeFunc;
    }

    @Override
    public Observable<R> call(Observable<T> source) {
        return source.window(2, 1).flatMap(pair -> {
            Observable<T> left = pair.first();
            Observable<T> right = pair.last();
            return left.zipWith(right, mergeFunc);
        }
    }
}
well, actually, this still has the single-emission problem
David Stemmer
@weefbellington
Oct 13 2015 14:19
probably this would be better:
public class LookbackTransformer<T, R> implements Observable.Transformer<T, R> {

    private final Func2<T, T, R> mergeFunc;

    public LookbackTransformer(Func2<T, T, R> mergeFunc) {
        this.mergeFunc = mergeFunc;
    }

    @Override
    public Observable<R> call(Observable<T> source) {
        return source.window(2, 1).flatMap(pair -> {
            Observable<T> left = pair.elementAt(1);
            Observable<T> right = pair.elementAt(2);
            return left.zipWith(right, mergeFunc);
        }
    };
}
that way, if there were only one element in source, right would never resolve and the final merge would never occur
dammit, that wouldn't work either, elementAt throws an IndexOutOfBoundsException :D
David Stemmer
@weefbellington
Oct 13 2015 14:26
last try:
public class LookbackTransformer<T, R> implements Observable.Transformer<T, R> {

    private final Func2<T, T, R> mergeFunc;

    public LookbackTransformer(Func2<T, T, R> mergeFunc) {
        this.mergeFunc = mergeFunc;
    }

    @Override
    public Observable<R> call(Observable<T> source) {
        return source.window(2, 1).flatMap(pair -> {
            try {
                Observable<T> left = pair.elementAt(1);
                Observable<T> right = pair.elementAt(2);
                return left.zipWith(right, mergeFunc);
            } catch (IndexOutOfBoundsException e) {
                return Observable.never();
            }
        }
    };
not very elegant
and I'm not sure it would work
Dorus
@Dorus
Oct 13 2015 14:59
Looks right, and since onNext is probably called more often than onComplete, this is more efficient than checking the size of pair. Still, window didn't eliminate the special case for the last element as hoped.
Dorus
@Dorus
Oct 13 2015 15:11
Not even sure if you really need window here. Might as well go for
left = publish source; right = left.skip(1); result = left.zipWith(right, mergFunc); left.connect(); return result;(in speudo code). Should probably use observable.create to be able to track the disposable from connect() there.
Dorus
@Dorus
Oct 13 2015 15:44
Mm, i got it this far, but now i cant figure out how to connect the subscription to the observable.create like you would do in C#.
@Override
public Observable<R> call(Observable<T> source) {
    return Observable.create(new Observable.OnSubscribe<R>() {
    @Override
    public void call(Subscriber<? super R> observer) {
        ConnectableObservable<T> left = source.publish();
        Observable<T> right = left.skip(1);
        Subscription sub = new CompositeSubscription(
                left.zipWith(right, mergeFunc).subscribe(observer), 
                left.connect());
        // return sub??
    }
 }) ;
}
David Stemmer
@weefbellington
Oct 13 2015 16:03
I really don't think the publish subject is necessary
Dorus
@Dorus
Oct 13 2015 16:08
If you do something like source.select(slowFunction()).lookback(...), do you want slowFunction() to run twice per element?
David Stemmer
@weefbellington
Oct 13 2015 16:09
you're right, I think you can use share()
which is an alias for publish().refcount()
iirc
Dorus
@Dorus
Oct 13 2015 16:13
*mean map
yes share sounds perfect
are we sure it is impossible for elements to emit during subscription?
David Stemmer
@weefbellington
Oct 13 2015 17:28
I'm not sure what you mean, originally I was using a Transformer so I wasn't using an onSubscribe function
Dorus
@Dorus
Oct 13 2015 17:29
you can still use transformer for this
i mean, is it possibel for the source to emit a element between source.share() and left.skip(1)
because that would be a problem
David Stemmer
@weefbellington
Oct 13 2015 17:36
a transformer constructs a new observable before subscription occurs, right?
David Stemmer
@weefbellington
Oct 13 2015 17:41
in my understanding, a Transformer takes a a source Observable<T> and transforms it into an output Observable<R>, at the time that the Observable is created. So overlapping emissions shouldn't be a problem.
Dorus
@Dorus
Oct 13 2015 17:51
Yeah probably, but i would prefer to be 100% sure. Dont want things like subscribeOn have a effect here.
David Stemmer
@weefbellington
Oct 13 2015 17:55
FYI, the original LookbackTranformer I posted doesn't duplicate mapped functions on the source observable:
public class LookbackTransformer<T, R> implements Observable.Transformer<T, R> {

    private final Func2<T, T, R> mergeFunc;

    public LookbackTransformer(Func2<T, T, R> mergeFunc) {
        this.mergeFunc = mergeFunc;
    }

    @Override
    public Observable<R> call(Observable<T> source) {
        Observable<T> offsetSource = source.skip(1);
        return source.zipWith(offsetSource, mergeFunc);
    }
}
I just tested it and all mapped functions prior to the compose(lookbackTransformer) are getting called once
Dorus
@Dorus
Oct 13 2015 17:56
guess you got it right the first time then :D
David Stemmer
@weefbellington
Oct 13 2015 17:59
I can't really explain why though
I'm looking at the internals of the Zip operator
wow, it's pretty hairy in there, I think I'd need to step through with the debugger to figure out what's going on :)
today is not that day though
Dorus
@Dorus
Oct 13 2015 18:09
yeah i had the same thought when i looked at the C# source once :P
Dorus
@Dorus
Oct 13 2015 18:22
ok, this is really wrong. I tried your code with transform, and i dont like the result.
The output looks like this:
do 0
do 0
do 1
do 1
sb 1
do 2
sb 2
sb 0 is missing and the do is seen twice per element.
David Stemmer
@weefbellington
Oct 13 2015 18:23
This message was deleted
let me try it
Dorus
@Dorus
Oct 13 2015 18:29
If i add source.share() it only calls do once
Still missing sb 0
Oh right, there is no sb 0, as that's (0,1) -> 1
David Stemmer
@weefbellington
Oct 13 2015 18:31
yeah, it waits for the second emission until doing the zip function
Dorus
@Dorus
Oct 13 2015 18:31
yup
David Stemmer
@weefbellington
Oct 13 2015 18:31
I think I was running into a problem with share though where skip() was taking effect on both observables
Dorus
@Dorus
Oct 13 2015 18:31
why would skip effect on both?
@Override
public Observable<R> call(Observable<T> source) {
    Observable<T> source2 = source.share();
    Observable<T> offsetSource = source2.skip(1);
    return source2.zipWith(offsetSource, mergeFunc);
}
David Stemmer
@weefbellington
Oct 13 2015 18:34
it really shouldn't, but can you try printing both a and b?
Dorus
@Dorus
Oct 13 2015 18:34
yeah i already did. Didn't update the gist
updated
David Stemmer
@weefbellington
Oct 13 2015 18:35
I'm running on Android right now and I don't have an environment set up where I can test Rx functions quickly
I need to build a regular Java project
Dorus
@Dorus
Oct 13 2015 18:35
I just have eclipse open here
do 0
do 1
sb 0 1
do 2
sb 1 2
do 3
sb 2 3
Thats what i see with share()
David Stemmer
@weefbellington
Oct 13 2015 18:37
that seems right, I'm not sure why I wasn't seeing the double emissions in practice
Dorus
@Dorus
Oct 13 2015 18:42
I tried to put a Thread.sleep(100); between the lines in call and it doesn't give any problems. Guess that part is synchronized somewhere.
David Stemmer
@weefbellington
Oct 13 2015 18:48
thanks for catching that. I really need to set up a minimal Java sandbox environment so I can test this stuff more easily.
Dorus
@Dorus
Oct 13 2015 18:51
yeah, it takes some time to figure out all things that are happening within Rx. Some code runs once on creation, some for every subscription, some for every element. If you duplicate your subscription somewhere, you run it twice for every event.
David Stemmer
@weefbellington
Oct 13 2015 18:52
I think I see what was happening in my case. The source observable had already been multicasted with share()
Dorus
@Dorus
Oct 13 2015 18:55
Yup, that hides the culprit.
David Stemmer
@weefbellington
Oct 13 2015 19:10
Thanks again! shame on me for not isolating the component under test
Dorus
@Dorus
Oct 13 2015 19:30
Mm, bit annoying that the share in the transformer now effect separate subscriptions also. I wonder if that's avoidable.
Perhaps with observable.create?
David Stemmer
@weefbellington
Oct 13 2015 19:32
it sounds like you're describing something like a defensive copy
Dorus
@Dorus
Oct 13 2015 19:34
@Override
public Observable<R> call(Observable<T> source2) {
    return Observable.create(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber<? super R> t) {
            Observable<T> source = source2.share();
            Observable<T> offsetSource = source.skip(1);
            source.zipWith(offsetSource, mergeFunc).subscribe(t);
        }
    });
}
That works
Feels a little funky
David Stemmer
@weefbellington
Oct 13 2015 19:43
I don't see anything easier, it looks like toObservable just hides the type, it doesn't create a copy that forwards the results to another subscriber
Dorus
@Dorus
Oct 13 2015 19:43
I'm trying to write it using Lambdas, looks quite a bit cleaner.
static <T, R> Transformer<T, R> lookback(Func2<T, T, R> mergeFunc) {  
return source -> Observable.create(result -> {
        Observable<T> left = source.share();
        Observable<T> right = left.skip(1);
        left.zipWith(right, mergeFunc).subscribe(result);
});
}
David Stemmer
@weefbellington
Oct 13 2015 19:56
good call on the not propagating the multicasting to downstream subscribers...I think it would be good to get a unit test around this
if you're curious, what I'm using this for is to take a stream of video positions, [0, 60, 120, 150...] and turn it into a stream of video intervals, [ (0,60), (60, 120), (120, 150) ... ], and then log the start and end time of each interval
Dorus
@Dorus
Oct 13 2015 20:00
interesting usecase
David Stemmer
@weefbellington
Oct 13 2015 20:00
the stream can be started and stopped when the video is paused/played
well, technically, the stream is completed on pause, and a new stream is created when playback begins again
I really didn't want to do an awful stateful thing where I was caching the previous timestamp in a mapping function or something
Dorus
@Dorus
Oct 13 2015 20:02
you could use scan
but then you need to make a tuple each time that keeps the last two values
The mergfunction is much more elegant
David Stemmer
@weefbellington
Oct 13 2015 20:03
yeah I considered a scan/reduce with an accumulator
but it felt weird
stuffing a lot of state into an accumulator function is a code smell to me
Dorus
@Dorus
Oct 13 2015 20:05
yeah, and you also keep more state than you strictly need.
indeed
i want a scan that emit two values, one for the next iteration and one to send downstream
you can wrap it in a tuple, but then you are constantly creating new tuples, as well as keeping the previous element until the next element arrives.
David Stemmer
@weefbellington
Oct 13 2015 20:08
I think accumulators that accumulate too much state should be called "katamari accumulators" :)