These are chat archives for ReactiveX/RxJava

12th
Jul 2017
Yannick Lecaillez
@ylecaillez
Jul 12 2017 18:45

Hi guys !
I'm working on a custom groupBy() and reading the RxJava's one i wonder if there is a bug or something i don't get: it seems that groupBy() is request()ing twice its upstream for each receive item when each of these item is creating a new group:

public void testGroup() {
        final AtomicLong requested = new AtomicLong();
        Flowable.range(1, 2048)
                .doOnRequest(requested::addAndGet)
                .groupBy(e -> e)
                .subscribe(g -> g.subscribe(TestSubscriber.create()));

        System.out.println("Requested: " + requested);
}

Displays:
Requested: 4224

Ideally i would have expected 128 + 2048 so that requested of the source remains at 128.
Am i missing something ?
(I'm using RxJava 2.1.0)
Stephen Berger
@GettingNifty
Jul 12 2017 18:54
What's adsAndGet
Yannick Lecaillez
@ylecaillez
Jul 12 2017 18:54
its AtomicLong.addAndGet(requested)
Stephen Berger
@GettingNifty
Jul 12 2017 18:56
So is it retrieving and adding it toitself somehow .. I'm a newb
Or your e reference
Calling itself? How does that work
Yannick Lecaillez
@ylecaillez
Jul 12 2017 18:57
The problem i can see is if 2048 is not 2048 but something way bigger, at some point the requested value from the source will be huge.
And if the source is at some point able to produce such number of items going into one group, we might hit an OOME ?
@GettingNifty This is lamba. Basically its more or less the equivalent of:
.doOnRequest(new LongConsumer() {
    void accept(long t) {
        requested.addAndGet(t);
    }
})
Stephen Berger
@GettingNifty
Jul 12 2017 19:18
Maybe the one is the second array
Try 0
Yannick Lecaillez
@ylecaillez
Jul 12 2017 21:34
Ok, i'm able to produce an OOME with groupBy() :-(