These are chat archives for ReactiveX/RxJava

24th
Oct 2018
Yannick Lecaillez
@ylecaillez
Oct 24 2018 08:17
Hello !
While working on a piece of code i discovered that Flowable.concatMap() was invoking upstream.cancel() once inner subscriber is done. What's the reason for that ?
Flowable.just(Boolean.TRUE)
                .repeat()
                .concatMap(token -> Flowable.just("blah").doOnCancel(() -> System.out.println("Cancelled")))
                .subscribe();
this seems to be an undesirable(?) side effect of SubscriptionArbiter in ConcatMapInner
Note that Flowable.flatMap() does not have this issue(?).
David Karnok
@akarnokd
Oct 24 2018 10:47
There is no need for that in respect to concat/concatMap but last time I've tried to change that broke a couple of other operators. I have to investigate if we need two arbiter modes or the replace-cancel could be done outside of the arbiter entirely.
Yannick Lecaillez
@ylecaillez
Oct 24 2018 12:30
That's not an issue when using Flowable given that cancelling a completed Flowable should be sthg like a no-op.
The behavior for Completable, Maybe and Single is less obvious to me.
If i have a Single<Socket> connect(). And this one succeed. Is an invocation of dispose() should close the socket or not ?
Yannick Lecaillez
@ylecaillez
Oct 24 2018 12:36
Well ... i think it should not close the socket. Dispose is actually related to the Single, not to the resource forwarded by the Single, isn't it ?
Yannick Lecaillez
@ylecaillez
Oct 24 2018 12:42
The difficult part here is to handle race conditions... (i.e: what if the connection is successfully established and forwarded at the exact same time downstream invoked dispose(): the socket might be leaked)
so this would favor a socket.close() into the dispose() impl ...But then you are subject to operators behaviors (like concatMap() which cancel()/dispose() upstream once completed)
I would greatly appreciate ideas/opinions on that :)
(note that for now i've replaced concatMap(foo) by flatMap(foo, 1) but it's probably less efficient ...)
Yannick Lecaillez
@ylecaillez
Oct 24 2018 15:00
At the end, i think operators should never cancel() upstream unless they're propagating a downstream cancel() invocation OR if it's the expected behavior of the operator to limit the upstream (i.e: takeWhile()).
As such, i think the current behavior of concatMap() exposed by my snippet should be considered as a bug.
WDYT ?
Yannick Lecaillez
@ylecaillez
Oct 24 2018 15:27
Ouch, sadly the flatMap(foo, 1) hack seems to have some serious performance impact :-(
Yannick Lecaillez
@ylecaillez
Oct 24 2018 16:00
I just wrote a custom DropCancel operator and kept the concatMap(). It seems to have less performance impact.
kerr
@hepin1989
Oct 24 2018 19:14
 Observable.create(delegate)
            .buffer(1, TimeUnit.SECONDS, MessageProducer.LMT)
            .filter(CollectionUtil::isNotEmpty)
If I emits concurrently by call delegate.onNext, will it lost messages?
David Stocking
@dmstocking
Oct 24 2018 21:08
@akarnokd Thanks for taking the time to explain that :)