RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
The governing rule here is 1.6:
"If a Publisher signals either onError or onComplete on a Subscriber, that Subscriber’s Subscription MUST be considered cancelled.
The intent of this rule is to make sure that a Subscription is treated the same no matter if it was cancelled, the Publisher signalled onError or onComplete."
People coming from different backgrounds tend to get confused by different naming. There is often discussion how to name components so that it doesn't imply too much but doesn't go under the radar either. setCancellable
follows the typical bean-setter naming indicating you set some object on this emitter.
BehaviorSubject<T> valueSubject = (BehaviorSubject<T>) BehaviorSubject.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(ObservableEmitter<T> emitter) throws Exception {
// do something here
}
});
io.reactivex.internal.operators.observable.ObservableCreate cannot be cast to io.reactivex.subjects.BehaviorSubject
create
method. create(ObservableOnSubscribe)
is defined on Observable
which creates an Observable
, a cold source and will have nothing to do with the subject type. Many operators in RxJava are named after the behavior they do in an everyday language, such as "actions done when an observer subscribes" -> doOnSubscribe
, when the subscription gets disposed: doOnDispose
. Subjects themselves are not disposed.@RequestMapping(method = RequestMethod.POST, value = "/pushData", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
public Single<String> pushData(@RequestBody String json)
How I can slowdown a pressure on the logic inside this method? Is it a right way of thinking or I should slow it down the level of Undertow/Jetty/Netty by applying some strategy on it.
Single
implies a single result so backpressure is not really at play there. If you want to reduce the call frequency to pushData
itself, there are ways for it, such as using concatMap
or flatMap
with concurrency limit, spanning out the source events over time, etc. You could also research Retrofit use cases similar to your problem.
public Single<String> pushJsonDataAsync(String json) {
return Single.create(source -> {
try {
PushCrawlerDataRequest.Builder request = PushCrawlerDataRequest.newBuilder();
JsonFormat.parser().merge(json, request);
PushCrawlerDataResponse.Builder builder = pushData(request.build());
source.onSuccess(JsonFormat.printer().print(builder));
} catch (Exception ex) {
source.onError(ex);
}
});
}
this is how I'm trying to use it right now
hello,
I have a Flowable that gets inbound data with events. These events are not in order. These messages are expected to reach subscribers in order(ascending order of event ID).
This is a infinite stream, sometime may be large enough to be stored in memory.
How can I achieve this using rxJava or without rx?
Flowable
, BackpressureStrategy.DROP
and the sample
operator to limit the number of frames per second. I have implemented the same functionality using Observable
and also limiting the number of frames with the sample
operator. The second variant has a superior performance with respect to the first, however, I am concerned that in a computer with less processing power may have Backpressure issues.
How to properly perform action after the subscription? doOnSubscribe doesn't fit the needs
Example
PublishProcessor<Integer> processor = PublishProcessor.create();
processor
.doOnSubscribe(subscription -> processor.onNext(1))
.subscribe(System.out::println);
processor.onNext(2);
prints
2
As a workaround i use this approach
processor
.mergeWith(Completable.fromAction(() -> processor.onNext(1)).toFlowable())
.subscribe(System.out::println);
This prints
1
2
as expected
@akarnokd I have 2 event bus like processors. One is for requests second is for results. There may be multiple request handlers which will post a results to the result processor. This may be either synchronously or asynchronously. I want to make a request and wait for the result in the same flowable
Simplified case
PublishProcessor<Integer> requestBus = PublishProcessor.create();
PublishProcessor<Pair<Integer, Integer>> resultBus = PublishProcessor.create();
requestBus.filter(value -> value == 1)
.map(value -> Pair.create(value, 10))
.subscribe(resultBus);
Integer request = 1;
Flowable<Integer> resultFlowable = resultBus
.filter(requestResultPair -> requestResultPair.first == request)
.doOnSubscribe(__ -> requestBus.onNext(request));
BehaviorSubject
flatMapped in instead of the constant and subscribe the BehaviorSubject
to the shared observable in publish
.
Hello.
I'm trying to implement a functionality which can be described as "striped" Observer. The idea is pretty simple, I want Observers (subscriptions) for a “stripe” to be single-threaded.
Stated differently, I would like to select a Scheduler (for observeOn operation) based on current event. Think something like observeOn(Function<T, Scheduler>) instead of current observeOn(Scheduler).
Simple example which seem to work but not when called multiple times:
@Test
public void example() throws Exception {
final List<Scheduler> schedulers = Stream.generate(() -> Schedulers.from(Executors.newSingleThreadExecutor())).limit(10).collect(Collectors.toList());
final Function<String, String> keyFn = s -> s;
// select scheduler for each element
final Function<String, Scheduler> schedulerFn = key -> schedulers.get(Math.abs(key.hashCode()) % schedulers.size());
Observable.just("one", "two", "three", "one", "two", "four")
.groupBy(i -> i) // this is value -> key function
.flatMap(g -> g.subscribeOn(schedulerFn.apply(g.getKey())))
.subscribe(e -> System.out.printf("key=%s value=%s thread=%s\n", e, e, Thread.currentThread().getName()));
Thread.sleep(1_000);
}
g.subscribeOn
moves the subscription to another thread but g
is a hot Observable
and therefore subscribeOn
has no practical effect on it. Use observeOn
but note that your setup will still execute the onNext
handler of subscribe
sequentially. You should add computation after g.observeOn(...).op().op()
before merging the results. Also note that Observable.just(g)
will likely still not work as there is no reason to turn g
into a nested source.