These are chat archives for ReactiveX/RxJava

2nd
May 2016
dwursteisen
@dwursteisen
May 02 2016 12:23

Hello, I got an issue with the combineLatest. I understand the issue, but don't find a elegant alternative.

        Observable<Integer> data = Observable.just(1, 2, 3, 4, 5);
        Observable<Long> wait = Observable.timer(1, TimeUnit.SECONDS);

        // print 5->0
        // expected : 
        // 1 -> 0
        // 2 -> 0
        // 3 -> 0
        // 4 -> 0
        // 5 -> 0
        Observable.combineLatest(data, wait, (a, b) -> a + "->" + b).subscribe(System.out::println);

I want to combine each elements of my first Observable with the one from the second. How would you proceed ?

Dorus
@Dorus
May 02 2016 13:03
@dwursteisen Sounds like you want withLatestFrom instead of combineLatest.
Or do you want a 1-1 mapping? Then you would need zip.
dwursteisen
@dwursteisen
May 02 2016 13:05
That's what we thought.
Dorus
@Dorus
May 02 2016 13:06
If you use withLatestFrom you can add a default value to the right stream by adding startWith to it.
dwursteisen
@dwursteisen
May 02 2016 13:06
But in this case, withLatestFrom won't combine, as the wait observable won't emit before the end of the data Observable.
Dorus
@Dorus
May 02 2016 13:07
Let me guess, your data observable is cold, but the wait one is hot?
dwursteisen
@dwursteisen
May 02 2016 13:07
With the startWith we'll combine with this element
both are cold.
Dorus
@Dorus
May 02 2016 13:08
You can also do wait.take(1).flatMap(e -> data, (a, b) -> a + "->" + b)
dwursteisen
@dwursteisen
May 02 2016 13:09
that's my plan B, but in this case, data will be subscribed several times. (which is ok with a Observable.just). But in our case, the data Observable can be a network call.
I may want to use a replay(1).autoconnect() in this case, but I'm not a big fan of this.
Dorus
@Dorus
May 02 2016 13:14
So if i understand you right, you want a withLatestFrom scenario where you wait for the inner observable to yield before the outer gets subscribed.
I'm a bit rusty on RxJava, but let me try:
<T> Transformer<T, T> withLatestFromWait(Observable<S> other) {
  return source -> Observable.create(o -> {
    ConnectableObservable<T> s1 = source.publish();
    ConnectableObservable<T> s2 = other.publish();
    s1.withLatestFrom(s2).subscribe(o);
    o.add(s2.take(1).subscribe(__ -> o.add(s1.connect())));
    o.add(s2.connect());
  });
}
Dorus
@Dorus
May 02 2016 13:22
@dwursteisen
dwursteisen
@dwursteisen
May 02 2016 13:24
thanks ! But in this case, s1 will be subscribed only when s2 will complete. So it will be serialized (and we don't want that).
(and by written that, I notice that the wait.flatMap(a -> data.map(b -> a+b)) lead to the same issue)
Dorus
@Dorus
May 02 2016 13:25
You mean in my code? s1 will subscribe as soon as s2 yields just 1 element.
If it completes, nothing happens. Mm, might need to forwards the onCompleted then.
Can you give a better example of what you want? (testcase/marble etc), i'm not following you.
dwursteisen
@dwursteisen
May 02 2016 13:27
sorry, I haven't notice the take(1) but even in this case, we have to wait the first item, before subscribing to the next observable.
Dorus
@Dorus
May 02 2016 13:27
But that's what connect does right?
Gotta go now, but will read back later. Perhaps others know a solution :)
dwursteisen
@dwursteisen
May 02 2016 13:28
Maybe I haven't correctly understood your solution too :D
dwursteisen
@dwursteisen
May 02 2016 13:45
Your solution is close from what we want but it wait an item from the s2 before connecting s1.
Dorus
@Dorus
May 02 2016 14:05
oh, you want s1 to connect, but buffer?
dwursteisen
@dwursteisen
May 02 2016 14:10
in a kinda way, yes.
(ie: I expect more a cache than a buffer -> if I can to avoid a List<Item>)
dwursteisen
@dwursteisen
May 02 2016 14:47
our current solution :
public <T1, T2, R> Observable.Transformer<T1, R> waitToCombineWith(Observable<T2> other, Func2<T2, T1, R> lambda) {
        return source -> {
            Observable<T1> s = source.replay(1).autoConnect();
            return Observable.concat(other, Observable.never()).buffer(s)
                             .flatMapIterable(i -> i)
                             .withLatestFrom(s, lambda);
        };
    }
=> force buffering (concat obs + never) until the other obs finish (buffer(s)) then compose
Dorus
@Dorus
May 02 2016 14:54
wait, now you do withLatestFrom(source), wasn't that suposed to be withLatestFrom(other)?
I'm also not sure what you get out of .buffer(s)
autoConnect seems dangerous here in case source yields 2 items right away.
@dwursteisen
dwursteisen
@dwursteisen
May 02 2016 14:57
you mean replay(1) is dangerous, no ?
Dorus
@Dorus
May 02 2016 14:58
no i mean autoConnect, i prefer to do .publish(); subscribe(); .connect() like i did above.
because you subscribe multiple times, the second subscription might miss out on items if the source yields really quick.
Mmm thinking about it, that's not really an issue here. biggest thing is all values from other are stored in the buffer until source yields. While that's only required the very first time.
Do i also understand it right that you intend source to be wait and other to be data?
dwursteisen
@dwursteisen
May 02 2016 15:03
We just uptated our code, as it wasn't clear enought. See bellow :
 public <T1, T2, R> Observable.Transformer<T1, R> waitToCombineWith(Observable<T2> other, Func2<T1, T2, R> lambda) {
        return source -> {
            Observable<T2> s = other.replay(1).autoConnect();
            return Observable.concat(source, Observable.never()).buffer(s)
                             .flatMapIterable(i -> i)
                             .withLatestFrom(s, lambda);
        };
    }
 Observable<Long> wait = Observable.timer(1, TimeUnit.SECONDS)
                                            .doOnSubscribe(() -> System.out.println("PERFORM NETWORK CALL"))
                                            .doOnNext(i -> System.out.println("NETWORK RESPONSE"));

        Observable<String> data = Observable.just(1, 2, 3, 4, 5)
                                             .doOnNext(System.out::println)
                                             .compose(waitToCombineWith(wait, (a, b) -> a + " -> " + b));

        data
                .subscribe(System.out::println);