RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
How to properly perform action after the subscription? doOnSubscribe doesn't fit the needs
Example
PublishProcessor<Integer> processor = PublishProcessor.create();
processor
.doOnSubscribe(subscription -> processor.onNext(1))
.subscribe(System.out::println);
processor.onNext(2);
prints
2
As a workaround i use this approach
processor
.mergeWith(Completable.fromAction(() -> processor.onNext(1)).toFlowable())
.subscribe(System.out::println);
This prints
1
2
as expected
@akarnokd I have 2 event bus like processors. One is for requests second is for results. There may be multiple request handlers which will post a results to the result processor. This may be either synchronously or asynchronously. I want to make a request and wait for the result in the same flowable
Simplified case
PublishProcessor<Integer> requestBus = PublishProcessor.create();
PublishProcessor<Pair<Integer, Integer>> resultBus = PublishProcessor.create();
requestBus.filter(value -> value == 1)
.map(value -> Pair.create(value, 10))
.subscribe(resultBus);
Integer request = 1;
Flowable<Integer> resultFlowable = resultBus
.filter(requestResultPair -> requestResultPair.first == request)
.doOnSubscribe(__ -> requestBus.onNext(request));
BehaviorSubject
flatMapped in instead of the constant and subscribe the BehaviorSubject
to the shared observable in publish
.
Hello.
I'm trying to implement a functionality which can be described as "striped" Observer. The idea is pretty simple, I want Observers (subscriptions) for a “stripe” to be single-threaded.
Stated differently, I would like to select a Scheduler (for observeOn operation) based on current event. Think something like observeOn(Function<T, Scheduler>) instead of current observeOn(Scheduler).
Simple example which seem to work but not when called multiple times:
@Test
public void example() throws Exception {
final List<Scheduler> schedulers = Stream.generate(() -> Schedulers.from(Executors.newSingleThreadExecutor())).limit(10).collect(Collectors.toList());
final Function<String, String> keyFn = s -> s;
// select scheduler for each element
final Function<String, Scheduler> schedulerFn = key -> schedulers.get(Math.abs(key.hashCode()) % schedulers.size());
Observable.just("one", "two", "three", "one", "two", "four")
.groupBy(i -> i) // this is value -> key function
.flatMap(g -> g.subscribeOn(schedulerFn.apply(g.getKey())))
.subscribe(e -> System.out.printf("key=%s value=%s thread=%s\n", e, e, Thread.currentThread().getName()));
Thread.sleep(1_000);
}
g.subscribeOn
moves the subscription to another thread but g
is a hot Observable
and therefore subscribeOn
has no practical effect on it. Use observeOn
but note that your setup will still execute the onNext
handler of subscribe
sequentially. You should add computation after g.observeOn(...).op().op()
before merging the results. Also note that Observable.just(g)
will likely still not work as there is no reason to turn g
into a nested source.
// consumer is "single-threaded"
Consumer<String> consumer = str -> {
state.computeIfAbsent(str, ignore -> ConcurrentHashMap.newKeySet()).add(Thread.currentThread().getName());
latch.countDown();
};
observable.compose(dynamicSchedulerSubscription(fn)).subscribe(consumer::accept);
<T, K> ObservableTransformer<GroupedObservable<T>, T> dynamicSchedulerSubscription(Function<K, Scheduler> fn,
Consumer<? super T> consumer {
return o -> o.flatMap(g -> g.observeOn(schedulerFn.apply(g.getKey())).doOnNext(consumer));
}
Observable.just("one", "two", "three", "one", "two", "four")
.groupBy(i -> i)
.compose(dynamicSchedulerSubscription(fn, consumer))
.subscribe(v -> { }, Throwable::printStackTrace);
Observable<String> singleThreaded = observable.compose(dynamicSchedulerSubscription(fn));
// somewhere (later) in the code
singleThreaded.subscribe(consumer1);
singleThreaded.subscribe(consumer2);
singleThreaded.subscribe(consumer3);
dynamicSchedulerSubscription
transformer since it makes it less generic.I've experienced unexpected behaviour using share operator
@Test public void testConcatShare(){
Maybe<Boolean> maybe1 = Maybe.defer(() -> {
System.out.println("Call from maybe1");
return Maybe.empty();
});
Maybe<Boolean> maybe2 = Maybe.defer(() -> {
System.out.println("Call from maybe2");
return Maybe.empty();
});
Flowable<Boolean> flowable= Maybe.concat(maybe1, maybe2).share();
flowable.subscribe(System.out::println);
flowable.subscribe(System.out::println);
}
unexpectedly prints
Call from maybe1
Call from maybe2
Call from maybe1
Call from maybe2
Should i use cache() operator in such case instead of share to avoid repeating calls in defer method?
flatMapCompletable
is a performance bottleneck for you, you could implement a variant which features more than one CompositeDisposable set
(as individual fields or as an array). There you don't have to change CompositeDisposable
but the locations of set.add()
, set.delete()
and set.dispose()
calls to pick the right stripe.