These are chat archives for ReactiveX/RxJava

Mar 2018
Mar 06 2018 09:13
What exactly is RxJava?
Yannick Lecaillez
Mar 06 2018 16:35
Hi all !
For information, we've run some benchmark of our product and we noticed some contention on CompositeDisposable add/delete (from FlowableFlatMapCompletableCompletable .FlatMapCompletableMainSubscriber). These methods are in synchronized block.
We switched to standard FlowableFlatMap which solved the contention (but i think it is adding a bit more pressure on GC because of the queue allocation. But i did not check that point).
This is just for informative purpose in case you're looking for some area to optimize ;-)
Yannick Lecaillez
Mar 06 2018 16:43
I was thinking about doing it myself and was thinking about using (i think) the same approach than ConcurrentHashMap: use multiple OpenHashSet instance and distribute the add/remove/lock on these instances based on the disposable hashCode. Disposable.hashCode would then be invoked twice but i think it should actually be Object.hashCode() most of the time so it should not be that costly.
Sthg like (pseudo code) void add(Disposable d) { OpenHashSet h = getHashSet(d.hashCode() % concurrencyFactor); synchronized(h) { h.add(d) }; }
or maybe synchronized on another object from a lock table so that we can create hash set on-demand... anyway.
Would you accept such PR or maybe you have a better way to solve it ?