These are chat archives for ReactiveX/RxJava

21st
Dec 2017
Yannick Lecaillez
@ylecaillez
Dec 21 2017 10:49
Hi guys ! I've question regarding resource management:
I've a Single<Connection> which connect to a server each time i'm subscribing to it.
Given the following code: singleConnection.timeout(10, SECONDS).subscribe().
If the timeout occurs, it'll forward an error and cancel() the singleConnection().
My question: what if the connection finally succeeded after the time out triggered but just before the cancel() been invoked ?
If nothing is done, i think this race condition will leak the connected connection .
Now i'm wondering what is the best way to handle such case ?
one way is to close the connection in singleConnection.cancel(), but given that the singleConnection has already emitted the connection, cancel() should be a no-op ?
Yannick Lecaillez
@ylecaillez
Dec 21 2017 10:55
Maybe i need some synchronization between cancel() (dispose() actually) and the onSuccess() notification
Yannick Lecaillez
@ylecaillez
Dec 21 2017 11:18
Actually i might have solved this issue with an AtomicBoolean: after dispose() it is guaranteed that the connection has been forwarded to the subscriber OR that it'll never be forwarded to it (disconnecting silently it if the connection happened in the mean time)
Oleh Dokuka
@OlegDokuka
Dec 21 2017 11:48
@ylecaillez, Actually, you may always write something like next
Flux
                .just("")
                .flatMap(v -> Flux.just("Opened").delaySubscription(Duration.ofSeconds(2)).doFinally(s -> System.out.println(s)))
                .timeout(Duration.ofSeconds(2), Flux.just("closed"))
                .subscribe(System.out::println);
And in that case always have a chance to cleanup your resources if flatted connection has been canceled
This example like try with finally
P.S. This example is written using Reactor 3, but do not worry, in RxJava 2 there is an identical API
David Karnok
@akarnokd
Dec 21 2017 12:13
Unfortunately, you can't solve this properly with RxJava 2 and Java 6. Either you need a resource-aware flow implementation which only exist on a drawing board or you need Java 9 and the Cleaner API to handle the falling out of scope. However, the latter can get complicated because you may not know all the references to the Connection object.
Yannick Lecaillez
@ylecaillez
Dec 21 2017 14:13

I finally wrote something like

        @Override
        public void dispose() {
            if (done.compareAndSet(false, true) && connection != null) {
                // Handle the case were we've been cancelled while the connection has actually been established but not
                // yet forwarded to the subscriber (e.g: because of a slow SSL handshake).
                connection.closeSilently();
            }
        }

WDYT ?

Yannick Lecaillez
@ylecaillez
Dec 21 2017 15:00
Well, i guess this is a bit too abstract out of its context ... :smile: Anyway, thanks !