These are chat archives for ReactiveX/RxJava

17th
Aug 2018
骨来(PeterLi)
@pli2014
Aug 17 2018 02:13
        Flowable
                .create(emitter -> {
                    int i = 0;
                    while (i++ < 10) {
                        emitter.onNext(RandomUtils.nextInt() + ":" + i);
                        Thread.sleep(100);
                    }
                    emitter.onComplete();
                }, BackpressureStrategy.BUFFER)
                //.subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.io())
                .buffer(1, TimeUnit.SECONDS, Schedulers.trampoline())
                .subscribe(list -> {
                    System.out.println("BlockedBufferRxTest.main:" + list.toString());
                });

         Thread.sleep(1000000);
Schedulers.trampoline() blocked the current main thread that cannot execute subscribe method. There are some overwrite buffer methods which called to Schedulers.computation() as input arguments.
Yannick Lecaillez
@ylecaillez
Aug 17 2018 10:50
@dmstocking How is this different from what i've done ? I'm in Java btw.
Isn't there a backpressure problem here ?
Volkan Yazıcı
@vy
Aug 17 2018 13:08

Using RxJava 1.1.9, I have a problem as follows:

import rx.Observable;

public enum ZipWithTest {;

    public static void main(String[] args) {
        Observable
                .just(1, 2, 3)
                .doOnNext(next -> System.out.format("next: %d%n", next))
                .zipWith(
                        Observable.just(10, 100, 1000),
                        (next, multiplier) -> next * multiplier)
                .doOnNext(multiplied -> System.out.format("multiplied: %d%n", multiplied))
                .toBlocking()
                .last();
    }

    /*
    next: 1
    next: 2
    next: 3
    multiplied: 10
    multiplied: 200
    multiplied: 3000
     */

}

I was expecting to get the following:

    next: 1
    multiplied: 10
    next: 2
    multiplied: 200
    next: 3
    multiplied: 3000

What am I missing?

骨来(PeterLi)
@pli2014
Aug 17 2018 13:23
@vy doOnNext only go through every element in just(1, 2, 3) source.
so that it invokes an action when it calls {@code onNext}
Volkan Yazıcı
@vy
Aug 17 2018 13:31
@pli2014 I got it. But why is it getting every element from just(1,2,3) and then zipping with the rest? Shouldn't they go in parallel? That is, one from source, one from multiplier, one from source, one from multiplier, and so on.
骨来(PeterLi)
@pli2014
Aug 17 2018 14:43
zipwith behavior is just that
DoOnNext is called after ever onNext op from every element of source
Ignacio Baca Moreno-Torres
@ibaca
Aug 17 2018 22:38
@vy you should really upgrade to rxjava 2, but as rx1 observable has backpressure this might work as you said, but the problem is that zipWith has a buffer, so the 3 next call are juts filling the zip buffer, not sure if in rx1 exists the zipWith operator overload to indicate the buffer size, but you can do it in rx2, in that case you need to use Flowable instead of Observable to support backpressure