These are chat archives for ReactiveX/RxJava

12th
Jul 2017
Yannick Lecaillez
@ylecaillez
Jul 12 2017 18:45 UTC

Hi guys !
I'm working on a custom groupBy() and reading the RxJava's one i wonder if there is a bug or something i don't get: it seems that groupBy() is request()ing twice its upstream for each receive item when each of these item is creating a new group:

public void testGroup() {
        final AtomicLong requested = new AtomicLong();
        Flowable.range(1, 2048)
                .doOnRequest(requested::addAndGet)
                .groupBy(e -> e)
                .subscribe(g -> g.subscribe(TestSubscriber.create()));

        System.out.println("Requested: " + requested);
}

Displays:
Requested: 4224

Ideally i would have expected 128 + 2048 so that requested of the source remains at 128.
Am i missing something ?
(I'm using RxJava 2.1.0)
Stephen Berger
@GettingNifty
Jul 12 2017 18:54 UTC
What's adsAndGet
Yannick Lecaillez
@ylecaillez
Jul 12 2017 18:54 UTC
its AtomicLong.addAndGet(requested)
Stephen Berger
@GettingNifty
Jul 12 2017 18:56 UTC
So is it retrieving and adding it toitself somehow .. I'm a newb
Or your e reference
Calling itself? How does that work
Yannick Lecaillez
@ylecaillez
Jul 12 2017 18:57 UTC
The problem i can see is if 2048 is not 2048 but something way bigger, at some point the requested value from the source will be huge.
And if the source is at some point able to produce such number of items going into one group, we might hit an OOME ?
@GettingNifty This is lamba. Basically its more or less the equivalent of:
.doOnRequest(new LongConsumer() {
    void accept(long t) {
        requested.addAndGet(t);
    }
})
Stephen Berger
@GettingNifty
Jul 12 2017 19:18 UTC
Maybe the one is the second array
Try 0
Yannick Lecaillez
@ylecaillez
Jul 12 2017 21:34 UTC
Ok, i'm able to produce an OOME with groupBy() :-(