These are chat archives for ReactiveX/RxJava

22nd
Feb 2018
Eugene Popovich
@httpdispatch
Feb 22 2018 11:37

How to properly perform action after the subscription? doOnSubscribe doesn't fit the needs

Example

PublishProcessor<Integer> processor = PublishProcessor.create();
processor
                .doOnSubscribe(subscription -> processor.onNext(1))
                .subscribe(System.out::println);
 processor.onNext(2);

prints

2

As a workaround i use this approach

processor
                .mergeWith(Completable.fromAction(() -> processor.onNext(1)).toFlowable())
                 .subscribe(System.out::println);

This prints

1
2

as expected

David Karnok
@akarnokd
Feb 22 2018 12:54
@httpdispatch startWith. doOnSubscribe is not good in this case because the processor's onNext doesn't see the subscriber at that moment.
Eugene Popovich
@httpdispatch
Feb 22 2018 13:25
@akarnokd Thanks for the answer. startWith is good if item may be ignored by any other processor subscribers. How i understand i should use some custom workaround in this case
David Karnok
@akarnokd
Feb 22 2018 13:27
I can't imagine why you need such reentrance, i.e., why old subscribers should be notified about new subscribers .
Eugene Popovich
@httpdispatch
Feb 22 2018 13:34

@akarnokd I have 2 event bus like processors. One is for requests second is for results. There may be multiple request handlers which will post a results to the result processor. This may be either synchronously or asynchronously. I want to make a request and wait for the result in the same flowable

Simplified case

PublishProcessor<Integer> requestBus = PublishProcessor.create();
PublishProcessor<Pair<Integer, Integer>> resultBus = PublishProcessor.create();
requestBus.filter(value -> value == 1)
        .map(value -> Pair.create(value, 10))
        .subscribe(resultBus);

Integer request = 1;
Flowable<Integer> resultFlowable = resultBus
        .filter(requestResultPair -> requestResultPair.first == request)
        .doOnSubscribe(__ -> requestBus.onNext(request));