These are chat archives for ReactiveX/RxJava

1st
Mar 2018
asereda-gs
@asereda-gs
Mar 01 2018 03:56

Hello.

I'm trying to implement a functionality which can be described as "striped" Observer. The idea is pretty simple, I want Observers (subscriptions) for a “stripe” to be single-threaded.

Stated differently, I would like to select a Scheduler (for observeOn operation) based on current event. Think something like observeOn(Function<T, Scheduler>) instead of current observeOn(Scheduler).

Simple example which seem to work but not when called multiple times:

@Test
public void example() throws Exception {
final List<Scheduler> schedulers = Stream.generate(() -> Schedulers.from(Executors.newSingleThreadExecutor())).limit(10).collect(Collectors.toList());
final Function<String, String> keyFn = s -> s;

// select scheduler for each element
final Function<String, Scheduler> schedulerFn = key -> schedulers.get(Math.abs(key.hashCode()) % schedulers.size());

Observable.just("one", "two", "three", "one", "two", "four")
        .groupBy(i -> i) // this is value -> key function
        .flatMap(g -> g.subscribeOn(schedulerFn.apply(g.getKey())))
        .subscribe(e -> System.out.printf("key=%s value=%s thread=%s\n", e, e, Thread.currentThread().getName()));

Thread.sleep(1_000);

}

Unfortunately this doesn’t work if called multiple times. I have created a test:
Any help would be appreciated
Eugene Popovich
@httpdispatch
Mar 01 2018 07:46
@asereda-gs use observeOn insteadof subscribeOn
or create new observable in the flatMap
Observable.just(g).subscribeOn(schedulerFn.apply(g.getKey())
David Karnok
@akarnokd
Mar 01 2018 08:47
g.subscribeOn moves the subscription to another thread but g is a hot Observable and therefore subscribeOn has no practical effect on it. Use observeOn but note that your setup will still execute the onNext handler of subscribe sequentially. You should add computation after g.observeOn(...).op().op() before merging the results. Also note that Observable.just(g) will likely still not work as there is no reason to turn g into a nested source.
asereda-gs
@asereda-gs
Mar 01 2018 15:23
@akarnokd something like this ?
observable.groupBy(keyFn::apply)
.flatMap(o -> o.observeOn(fn.apply(o.getKey())).map(dummy -> dummy))
.subscribe(e -> {
state.computeIfAbsent(e, ignore -> new CopyOnWriteArraySet<>()).add(Thread.currentThread().getName());
latch.countDown();
});
if the problem in Observable.just() for test as the source ?
@httpdispatch I did try with both observeOn() and subscribeOn() same result
asereda-gs
@asereda-gs
Mar 01 2018 15:28
Observable.just("one", "two", "three", "one", "two", "four")
        .groupBy(i -> i)
        .flatMap(g -> g.observeOn(schedulerFn.apply(g.getKey())).map(dummy -> dummy))
        .subscribe(e -> System.out.printf("key=%s value=%s thread=%s\n", e, e, Thread.currentThread().getName()));
key=one value=one thread=pool-3-thread-1
key=one value=one thread=pool-3-thread-1
key=two value=two thread=pool-3-thread-1
key=two value=two thread=pool-3-thread-1
key=four value=four thread=pool-3-thread-1
key=three value=three thread=pool-3-thread-1
I would like subscription to happen in different threads
David Karnok
@akarnokd
Mar 01 2018 16:44
FlatMap can reuse the same thread to emit items coming from different threads. You should put the computation into that map(dummy -> dummy) part.
asereda-gs
@asereda-gs
Mar 01 2018 19:08
How can I "isolate" that functionality as
Observable Transformer / Operation ?
Observable.compose( .... ).subscribe( )
David Karnok
@akarnokd
Mar 01 2018 20:20
.flatMap(g -> g.observeOn(schedulerFn.apply(g.getKey())).doOnNext(e -> System.out.printf("key=%s value=%s thread=%s\n", e, e, Thread.currentThread().getName())))