These are chat archives for ReactiveX/RxJava

31st
Oct 2018
Incubator
@incube8r
Oct 31 2018 17:56
Hello anyone around?
David Karnok
@akarnokd
Oct 31 2018 19:13
@htmsousa Why use window(boundary)? I'd pick a tool for my use case, not the other way around.
@incube8r Just ask your question.
Helder Sousa
@htmsousa
Oct 31 2018 23:24
@akarnokd thank you for your feedback.. at the end of the day, I guess I was over-complicating and Flowable.distinctUntilChanged() will be enough for my use case.. that said, if the API provides a method Flowable.window(boundaryIndicatorSupplier) it would be nice to have a little example showing the usage of that approach
Helder Sousa
@htmsousa
Oct 31 2018 23:33

as an academic example, here is something I came up with:

           // source of data
            Flowable<Integer> source = Flowable.fromArray(1, 1, 2, 2, 2, 3, 5, 5, 4, 4).publish().autoConnect(2);
            // create a boundary flowable that will emmit a new element when the sequence
            // goes from odd to even numbers (and vice versa)
            Flowable<Integer> boundary = source.distinctUntilChanged((p, n) -> p % 2 == n % 2);

            // create a window every time the boundary emmits a new element (each window
            // will contain a sequence of elements that are odd or even numbers)
            source
                .window(boundary)
                .doOnEach(w -> System.out.println("Source - New Window"))
                .forEach(window -> window.subscribe(j -> System.out.println(String.format("Value %s fetched", j))));

output

Source - New Window
Source - New Window
Value 1 fetched
Value 1 fetched
Source - New Window
Value 2 fetched
Value 2 fetched
Value 2 fetched
Source - New Window
Value 3 fetched
Value 5 fetched
Value 5 fetched
Source - New Window
Value 4 fetched
Value 4 fetched
Source - New Window