These are chat archives for ReactiveX/RxJava

15th
Jul 2017
Troy Patrick
@teasp00n
Jul 15 2017 07:04

I have a need to do something once a stream is subscribed to for the first time. I have written the following transformer to do it:

  public static <T> Observable.Transformer<T, T> doOnFirstSubscribe(final Action0 func) {
    return new Observable.Transformer<T, T>() {
      boolean firstSubscription = true;
      @Override
      public Observable<T> call(Observable<T> input) {
        return input.doOnSubscribe(() -> {
          if (firstSubscription) {
            func.call();
            firstSubscription = false;
          }
        });
      }
    };
  }

i've written tests for it and they pass however during usage in my application it appears to call func when the stream is resubscribed to.

does my transformer seem correct?

Yannick Lecaillez
@ylecaillez
Jul 15 2017 09:56
@akarnokd note the cancel(), invoked asynchronously. All groups but one are removed. Only the item being queued for the remaining group "1" causes the OOME.