These are chat archives for ReactiveX/RxJava

1st
Jan 2016
Dorus
@Dorus
Jan 01 2016 00:09
public static Observable getPinger(NetworkSource nws) {
    return Observable.create(o -> {
        nws.connect();
        o.add(Subscriptions.create(() -> {
            nws.cancel();
        }));
        try {
            String ping;
            while (!o.isUnsubscribed() && nws.connected) {
                try {
                    ping = nws.getPing();
                } catch (Exception e) {
                    if (!o.isUnsubscribed()) {
                        o.onError(e);
                    }
                    return;
                }
                o.onNext(ping);
            }
            if (!o.isUnsubscribed()) {
                o.onCompleted();
            }
        } finally {
            nws.disconnect();
        }
    });
}
Something along these lines.
Made a few assumptions here. NetworkSource nws should actually be a factory, but i was assuming there's only one, and you're not subscribing more than once.