These are chat archives for ReactiveX/RxJava

2nd
Mar 2018
Andrei Sereda
@asereda-gs
Mar 02 2018 06:32
Thanks David. What I'm looking is something like this:
// consumer is "single-threaded"
Consumer<String> consumer = str -> {
  state.computeIfAbsent(str, ignore -> ConcurrentHashMap.newKeySet()).add(Thread.currentThread().getName());
  latch.countDown();
};

observable.compose(dynamicSchedulerSubscription(fn)).subscribe(consumer::accept);
private static <T, K> Observable.Transformer<T, T> dynamicSchedulerSubscription(Function<T, Scheduler> fn) {
// this doesn't work
return obs -> obs.flatMap(o -> Observable.just(o).observeOn(fn.apply(o)));
}
with doOnNext() I can't really hook Observers.
Perhaps I'm missing something ?
David Karnok
@akarnokd
Mar 02 2018 08:30
<T, K> ObservableTransformer<GroupedObservable<T>, T> dynamicSchedulerSubscription(Function<K, Scheduler> fn,
        Consumer<? super T> consumer {
    return o -> o.flatMap(g -> g.observeOn(schedulerFn.apply(g.getKey())).doOnNext(consumer));
}

    Observable.just("one", "two", "three", "one", "two", "four")
            .groupBy(i -> i)
            .compose(dynamicSchedulerSubscription(fn, consumer))
            .subscribe(v -> { }, Throwable::printStackTrace);
Andrei Sereda
@asereda-gs
Mar 02 2018 09:14
But what if I don't know consumer in advance ?
ideally, I would like to return
observable.compose(dynamicSchedulerSubscription(fn))
then clients subscribe with (perhaps) different consumers
Andrei Sereda
@asereda-gs
Mar 02 2018 09:21
ie I would like to have a Transformer (or Operator) which would force onNext() execution on particular Scheduler
maybe there will be multiple subscribers.
But coupling Observable.Transformer and Consumer doesn't feel RXish
pls correct me if I'm wrong
Observable<String> singleThreaded = observable.compose(dynamicSchedulerSubscription(fn));
// somewhere (later) in the code
singleThreaded.subscribe(consumer1);
singleThreaded.subscribe(consumer2);
singleThreaded.subscribe(consumer3);
David Karnok
@akarnokd
Mar 02 2018 10:42
Those consumers will be invoked in a sequential manner, so thread hopping between two invocation serves no purpose as those still won't happen concurrently with each other within a subscriber.
Andrei Sereda
@asereda-gs
Mar 02 2018 11:07
what about something like that :
// compose() or lift()
observable1.compose(dynamicSchedulerSubscription(fn)).subscribe(consumer::accept);
observable2.compose(dynamicSchedulerSubscription(fn)).subscribe(consumer::accept);
observable3.compose(dynamicSchedulerSubscription(fn)).subscribe(consumer::accept);
David Karnok
@akarnokd
Mar 02 2018 12:12
Makes no sense if you just fork and join back the flow without doing anything inside the forks, which would require some external callbacks provided to the dynamicSchedulerSubscription, i.e., consumer.
Andrei Sereda
@asereda-gs
Mar 02 2018 18:25
thanks for clarification, David. And sorry if I ask too many questions.
the usecase I have is that my consumer(s) is single-threaded (for a particular key) so I can either wrap consumer in StripedLock or schedule execution on a striped single-thread scheduler.
Ideally I didn't want to expose consumer inside dynamicSchedulerSubscription transformer since it makes it less generic.
But it seems there is no way around it ?