These are chat archives for ReactiveX/RxJava

8th
Mar 2018
Eugene Popovich
@httpdispatch
Mar 08 2018 12:47

I've experienced unexpected behaviour using share operator

@Test public void testConcatShare(){
        Maybe<Boolean> maybe1 = Maybe.defer(() -> {
            System.out.println("Call from maybe1");
            return Maybe.empty();
        });
        Maybe<Boolean> maybe2 = Maybe.defer(() -> {
            System.out.println("Call from maybe2");
            return Maybe.empty();
        });
        Flowable<Boolean> flowable= Maybe.concat(maybe1, maybe2).share();
        flowable.subscribe(System.out::println);
        flowable.subscribe(System.out::println);
    }

unexpectedly prints

Call from maybe1
Call from maybe2
Call from maybe1
Call from maybe2

Should i use cache() operator in such case instead of share to avoid repeating calls in defer method?

David Karnok
@akarnokd
Mar 08 2018 14:15
@ylecaillez lock striping could improve performance, but requires a known number of stripes so that all of them can be cancelled. The problem is that the particular CompositeDisposable is somewhat deep inside that operator. Also arbitrarily forcing striping in this operator may be not what the users want or expect. If that flatMapCompletable is a performance bottleneck for you, you could implement a variant which features more than one CompositeDisposable set (as individual fields or as an array). There you don't have to change CompositeDisposable but the locations of set.add(), set.delete() and set.dispose() calls to pick the right stripe.
Otherwise, I don't think a change in RxJava is necessary.
Yannick Lecaillez
@ylecaillez
Mar 08 2018 14:21
Got it, thank you.
Actually using flatMap() rather than flatMapCompletable() already solved the problem for us. It is just counter intuitive that a more "specialized" method (flatMapCompletable) is actually less efficient then the more "general" one (flatMapCompletable).
for our use-case, at least :)
sorry, the more "general" one being flatMap() not flatMapCompletable()
David Karnok
@akarnokd
Mar 08 2018 14:22
@httpdispatch In your case, the setup is synchronous so by the time the second subscribe is called, the share has completed and reset to its unconnected state. The next subscribe will trigger a fresh connection and thus repeat the synchronous part again. You should consider using cache() or replay().autoConnect().
David Karnok
@akarnokd
Mar 08 2018 14:28
flatMap uses copy-on-write which is more efficient if the number of active inner sources is less than 8 as the whole array fits into a cache line nicely. I guess the deciding factor is the maxConcurrency parameter, if you actually set it in your experiments, so that <= 8 could be a copy-on-write setup, 8-32 a single composite and 32+ a striped composite with stripes equal to the available CPUs. Then, Thread ids could be also the pick factor.
Yannick Lecaillez
@ylecaillez
Mar 08 2018 14:31
Oh, that's interesting !
maxConcurrent is actually set to 64 but our test configuration (the client side) was actually done so that it effectively never go beyond 4
This, 8 - 32 sizing is actually fitting perfectly our use case given that beyond 16, or so, we have other bottleneck appearing
David Karnok
@akarnokd
Mar 08 2018 14:37
Well, this got my attention after all. Let me measure/experiment with the various composites for flatMapCompletable and see if there is an actual gain in async cases (and not much loss in synchronous cases, where synchronized can be optimized away by the JIT).
Yannick Lecaillez
@ylecaillez
Mar 08 2018 14:38
Cool :) Please, let me/us know the outcomes !
David Karnok
@akarnokd
Mar 08 2018 15:25

For the synchronous uses, my i7 4790 produces these Mops/s values for a pair of add/delete:

CompositeDisposable: 18.5 (empty), 15.4 (preoccupied)
FixedComposite (8 slots): 43.4 (empty), 26.5 (7 slots always taken)
StripedComposite (8 stripes): 18.5 (empty), 16.1 (some preoccupation)

Yannick Lecaillez
@ylecaillez
Mar 08 2018 15:28
So looks like FixedComposite is a clear winner ? I guess it can be used as soon as maxConcurrency settings is set ?
The array vs map might play a role here
(vs set actually)
David Karnok
@akarnokd
Mar 08 2018 15:30
These are only the synchronous use cases. Measuring the async case where the composites get really pounded takes a bit more time.
Yannick Lecaillez
@ylecaillez
Mar 08 2018 15:30
i guess FixedComposite superiority would be even more significant in async.
How is it implemented ? Each concurrent inner subscriber has its own dedicated index, then how to find a free slot ? Maybe it's actually an add / remove approach ? moving work at the remove phase ...
Do you have any git repo/branch/anything you could share ?
David Karnok
@akarnokd
Mar 08 2018 15:41
image.png
This compares the various types when adding/deleting 1M Disposable each concurrently and with some work between add and remove:
Yannick Lecaillez
@ylecaillez
Mar 08 2018 15:42
those numbers are totally awesome
Yannick Lecaillez
@ylecaillez
Mar 08 2018 15:43
I'm fighting every day to win 5% and here you have +300% improvement :)
David Karnok
@akarnokd
Mar 08 2018 15:44
The fixed uses linear search for an empty slot in a small array, knowing there shouldn't be more than the slot size needed.
Yannick Lecaillez
@ylecaillez
Mar 08 2018 15:45
I wonder if we couldn't use bitmap trick to maintain free slot indexes.
David Karnok
@akarnokd
Mar 08 2018 15:45
If there is small work 1-100, it looks like a significant win, not much for longer in-between works.
I wonder if we couldn't use bitmap trick to maintain free slot indexes.
open addressing could be used here, mod the hashCode to have the initial index, and start scanning forward for an empty slot.
Yannick Lecaillez
@ylecaillez
Mar 08 2018 15:47
that's smart ! never thought about that :)
oh yes but less data locality
and probably restricted to power of two for efficient mod
Yannick Lecaillez
@ylecaillez
Mar 08 2018 15:52
Still, might worth it for bigger value of concurrency
David Karnok
@akarnokd
Mar 08 2018 16:46
This was my experiment repo, now I have to see if it still beneficial when integrated into flatMapCompletable inside the RxJava repo.
Yannick Lecaillez
@ylecaillez
Mar 08 2018 16:54
Can't wait to see that :)
Yannick Lecaillez
@ylecaillez
Mar 08 2018 17:11
Do you think it could be beneficial to wrap the Disposable during add() into a wrapper which would also memoize the slot index so that it can be removed in 0(1) ?
(for FixedCompositeDisposable)
David Karnok
@akarnokd
Mar 08 2018 17:33
doesn't work. Whoever calls that Disposable's dispose, it won't call the wrapper.
Yannick Lecaillez
@ylecaillez
Mar 08 2018 17:34
ah, of course. silly me.
David Karnok
@akarnokd
Mar 08 2018 18:00
This is the baseline comparison in 2.1.11 snapshot:
image.png
Note that flatMapCompletable is not optimized for a scalar upstream like flatMap is.
Even though these are pure synchronous runs, the maxConcurrency has some effect due to the frequency of requesting more source items.
David Karnok
@akarnokd
Mar 08 2018 19:09
The baseline for asynchronous completables:
image.png
David Karnok
@akarnokd
Mar 08 2018 20:05
Well, the upgraded flatMapCompletable is mosty slightly slower most of the time:
image.png
David Karnok
@akarnokd
Mar 08 2018 20:38
Well, the async use isn't promising either:
image.png