RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
In an application, we can have multiple different streams. For example, let say we have the following Completable:
Completable ccompletable1 = message_queue1
.flatMapSingle(....)
....
.flatMapCompletable(....);
Completable ccompletable2 = message_queue2
.flatMapSingle(....)
.....
.flatMapCompletable(....);
Which of the following options are more efficient?
// subscribing stream separately
completable1.subscribe();
completable2.subscribe();
Or
// chain them together
completable1.
.mergeWith(completable2)
.subscribe()
Generally, what is the best practice for that? For example, in a project, it's better to chain all streams and subscribe once or it's better to subscribe to them separately?
Hi, Supposing that we have the following Flowable
message_queue
.filter(message -> ....)
.flatMapMaybe(....)
.flatMapMaybe(....)
.map(....)
.flatMapMaybe(....)
.flatMapMaybe(....)
.flatMapSingle(....)
.flatMapCompletable(....);
I wanted to know is there a way to configure it such that if any error happened in processing the items in the entire flow, just the item should be ignored and resume to next. I need something like onErrorResumeNext()
without any argument, but it seems in onErrorResumeNext
we should set a new Flowable (publisher) to be substituted in time of the error. So, in summary, my questions are how we can just ignore the item in a time of error in that flow and also where we should call that operator to take care of all the flow, for example, if we add the error handling operator at the end of the flow (here after flatMapCompletable()
) does it handle errors if any error happened inside the first operator which is filter
? If yes, can we call onErrorComplete()
after the flatMapCompletable
to catch all errors and ignore the item?
Thanks
@CheckReturnValue
@NonNull
public final ParallelFlowable<T> runOn(@NonNull Scheduler scheduler, int prefetch) {
ObjectHelper.requireNonNull(scheduler, "scheduler");
ObjectHelper.verifyPositive(prefetch, "prefetch");
return RxJavaPlugins.onAssembly(new ParallelRunOn<T>(this, scheduler, prefetch));
}
runOn
, similar to observeOn
provides an async boundary where each side could go at its own pace. However, when the producer side is fast responding, requesting one by one would be a performance hog. Therefore, the operator has an internal buffer that gets filled and drained so that requesting more items happens less frequently. The prefetch
allows you to control the length of this buffer. When the computation after runOn
takes long time or is imbalanced, you may want to keep this value low to ensure rails get roughly equal chance to work on more items in parallel.
@return
but I am not convinced for when there are several exceptions, do you know of any example or project that documents the exceptions to be able to have it as a reference?
I am writing an undergraduate theses at Mid Sweden University which is investigating the difference in eye-gaze of programmers while reading imperative and reactive code. I would love some volunteers to perform the experiment. It is done online by solving code snippets of the two paradigms and takes about 15 minutes all up.
No video is recorded or sent away. All eye-gaze predictions is done client side in your own browser.
You need a computer in a well lit room with a web camera and you should be familiar with both Java and RxJava.
The experiment is hosted on the university server and all instructions are given there.
https://studenter.miun.se/~evth1400/EyeGazeStudy/?group=4
I am very grateful if anyone would like to partake.
fun throttledClick(
targetView: View,
completionMethod: () -> Unit,
throttleWindow: Long = DOUBLE_CLICK_THROTTLE_WINDOW
) {
RxView.clicks(targetView)
.throttleFirst(throttleWindow, MILLISECONDS)
.subscribe {
completionMethod.invoke()
}
}
Flowable.error
was not passed to onError
and Firebase reported the crash. This question is posted here, what is the cause of this problem?
this.disposableSubscriptionOnUpstream = Flowable.fromPublisher(upstream)
.onBackpressureBuffer(
1000
, () -> {
messagesDropped.incrementAndGet();
log.error("буфер для канала телеметрии {} переполнен"
, channelId);
}
, BackpressureOverflowStrategy.DROP_OLDEST)
.observeOn(this.scheduler)
.subscribeOn(this.scheduler)
.subscribe(this::onNext, this::onError,this::onComplete,this::onSubscribe);
Could not deliver value due to lack of requests
Flowable
is disposed . I feel like it is related to the groupBy
. any tips on on what might be wrong? Tips on how to debug ? Common areas people create Deadlock with Flowable
?/* 100 */ final io.reactivex.rxjava3.core.Flowable _inputFlowable = new com.twilio.raas.reactive.KuduFlowable( [109/1945]
/* 101 */ (com.twilio.kudu.sql.CalciteKuduTable) root.getRootSchema().getSubSchema("kudu").getTable("ReportCenter.UsageReportTransactions"),
/* 102 */ false,
/* 103 */ v0stashed,
/* 104 */ v2stashed,
/* 105 */ v1stashed,
/* 106 */ new org.apache.calcite.linq4j.function.Function1(){
/* 107 */ public Object apply(final Object abstractRow) {
/* 108 */ return new Object[] {
/* 109 */ ( (org.apache.kudu.client.RowResult) abstractRow).getInt(0),
/* 110 */ ( (org.apache.kudu.client.RowResult) abstractRow).getDecimal(1)};
/* 111 */ }
/* 112 */
/* 113 */ },
/* 114 */ new org.apache.calcite.linq4j.function.Predicate1(){
/* 115 */ public boolean apply(final Object abstractRow) {
/* 116 */ return true;
/* 117 */ }
/* 118 */
/* 119 */ }).subscribeOn(io.reactivex.rxjava3.schedulers.Schedulers.io(), false).groupBy(com.twilio.calcite.rxjava.rel.RxAggregate.lixSelector(new org.apache.calcite.linq4j.function.Function1(
) {
/* 120 */ public int apply(Object[] a0) {
/* 121 */ return org.apache.calcite.runtime.SqlFunctions.toInt(a0[0]);
/* 122 */ }
/* 123 */ public Object apply(Object a0) {
/* 124 */ return apply(
/* 125 */ (Object[]) a0);
/* 126 */ }
/* 127 */ }
/* 128 */ )).flatMap(com.twilio.calcite.rxjava.rel.RxAggregate.lixReduceGroup(com.twilio.calcite.rxjava.rel.RxAggregate.lixSeedSupplier(lambdaFactory.accumulatorInitializer()), com.twilio.calcit
e.rxjava.rel.RxAggregate.lixReducer(lambdaFactory.accumulatorAdder()), lambdaFactory.resultSelector(new org.apache.calcite.linq4j.function.Function2() {
/* 129 */ public Object[] apply(int key, Record2_0 acc) {
/* 130 */ return new Object[] {
/* 131 */ key,
/* 132 */ acc.f0,
/* 133 */ acc.f1};
/* 134 */ }
/* 135 */ public Object[] apply(Integer key, Record2_0 acc) {
/* 136 */ return apply(
/* 137 */ key.intValue(),
/* 138 */ acc);
/* 139 */ }
/* 140 */ public Object[] apply(Object key, Object acc) {
/* 141 */ return apply(
/* 142 */ (Integer) key,
/* 143 */ (Record2_0) acc);
/* 144 */ }
/* 145 */ }
/* 146 */ )));
/* 147 */ return _inputFlowable.map(com.twilio.calcite.rxjava.rel.RxCalc.calcMapFunction(lixMapper));
Hello I'm trying to page a database using Flowable but in the last page it doesn't signal on Complete and the Flowable never ends
public static <T> Flowable<T> flowable(CompletionStage<MappedAsyncPagingIterable<T>> stage) {
return Single.fromCompletionStage(stage)
.toFlowable()
.flatMap(iterable -> {
final Flowable<T> fromIterable = Flowable.fromIterable(iterable.currentPage());
if (iterable.hasMorePages()) {
final Flowable<T> flowable = flowable(iterable.fetchNextPage());
return Flowable.concat(fromIterable, flowable);
} else {
return fromIterable;
}
});
}
I tried when iterable.hasMorePages() == false to concat the fromIterable Flowable with a Flowable.empty(), but it didn't work, onComplete never get invoked
this instanceof Fuseable
and then cast to QueueSubscription
along the way