RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
hello,
I have a Flowable that gets inbound data with events. These events are not in order. These messages are expected to reach subscribers in order(ascending order of event ID).
This is a infinite stream, sometime may be large enough to be stored in memory.
How can I achieve this using rxJava or without rx?
Flowable
, BackpressureStrategy.DROP
and the sample
operator to limit the number of frames per second. I have implemented the same functionality using Observable
and also limiting the number of frames with the sample
operator. The second variant has a superior performance with respect to the first, however, I am concerned that in a computer with less processing power may have Backpressure issues.
How to properly perform action after the subscription? doOnSubscribe doesn't fit the needs
Example
PublishProcessor<Integer> processor = PublishProcessor.create();
processor
.doOnSubscribe(subscription -> processor.onNext(1))
.subscribe(System.out::println);
processor.onNext(2);
prints
2
As a workaround i use this approach
processor
.mergeWith(Completable.fromAction(() -> processor.onNext(1)).toFlowable())
.subscribe(System.out::println);
This prints
1
2
as expected
@akarnokd I have 2 event bus like processors. One is for requests second is for results. There may be multiple request handlers which will post a results to the result processor. This may be either synchronously or asynchronously. I want to make a request and wait for the result in the same flowable
Simplified case
PublishProcessor<Integer> requestBus = PublishProcessor.create();
PublishProcessor<Pair<Integer, Integer>> resultBus = PublishProcessor.create();
requestBus.filter(value -> value == 1)
.map(value -> Pair.create(value, 10))
.subscribe(resultBus);
Integer request = 1;
Flowable<Integer> resultFlowable = resultBus
.filter(requestResultPair -> requestResultPair.first == request)
.doOnSubscribe(__ -> requestBus.onNext(request));
BehaviorSubject
flatMapped in instead of the constant and subscribe the BehaviorSubject
to the shared observable in publish
.
Hello.
I'm trying to implement a functionality which can be described as "striped" Observer. The idea is pretty simple, I want Observers (subscriptions) for a “stripe” to be single-threaded.
Stated differently, I would like to select a Scheduler (for observeOn operation) based on current event. Think something like observeOn(Function<T, Scheduler>) instead of current observeOn(Scheduler).
Simple example which seem to work but not when called multiple times:
@Test
public void example() throws Exception {
final List<Scheduler> schedulers = Stream.generate(() -> Schedulers.from(Executors.newSingleThreadExecutor())).limit(10).collect(Collectors.toList());
final Function<String, String> keyFn = s -> s;
// select scheduler for each element
final Function<String, Scheduler> schedulerFn = key -> schedulers.get(Math.abs(key.hashCode()) % schedulers.size());
Observable.just("one", "two", "three", "one", "two", "four")
.groupBy(i -> i) // this is value -> key function
.flatMap(g -> g.subscribeOn(schedulerFn.apply(g.getKey())))
.subscribe(e -> System.out.printf("key=%s value=%s thread=%s\n", e, e, Thread.currentThread().getName()));
Thread.sleep(1_000);
}
g.subscribeOn
moves the subscription to another thread but g
is a hot Observable
and therefore subscribeOn
has no practical effect on it. Use observeOn
but note that your setup will still execute the onNext
handler of subscribe
sequentially. You should add computation after g.observeOn(...).op().op()
before merging the results. Also note that Observable.just(g)
will likely still not work as there is no reason to turn g
into a nested source.
// consumer is "single-threaded"
Consumer<String> consumer = str -> {
state.computeIfAbsent(str, ignore -> ConcurrentHashMap.newKeySet()).add(Thread.currentThread().getName());
latch.countDown();
};
observable.compose(dynamicSchedulerSubscription(fn)).subscribe(consumer::accept);
<T, K> ObservableTransformer<GroupedObservable<T>, T> dynamicSchedulerSubscription(Function<K, Scheduler> fn,
Consumer<? super T> consumer {
return o -> o.flatMap(g -> g.observeOn(schedulerFn.apply(g.getKey())).doOnNext(consumer));
}
Observable.just("one", "two", "three", "one", "two", "four")
.groupBy(i -> i)
.compose(dynamicSchedulerSubscription(fn, consumer))
.subscribe(v -> { }, Throwable::printStackTrace);
Observable<String> singleThreaded = observable.compose(dynamicSchedulerSubscription(fn));
// somewhere (later) in the code
singleThreaded.subscribe(consumer1);
singleThreaded.subscribe(consumer2);
singleThreaded.subscribe(consumer3);
dynamicSchedulerSubscription
transformer since it makes it less generic.