RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Hi everybody,
I'm seeking a solution for error handling of the time a connection exception happens. In my case, the connection has been established to RMQ:
rabbitMQClient.rxStart()
.doOnComplete(() -> logger.info("RMQ Client connected successfully"))
.doOnError(error -> logger.error("RMQ connection error: ", error))
.retryWhen(throwableFlowable -> throwableFlowable.delay(5, TimeUnit.SECONDS))
....
The above chain is of the type Completable. My desire logic is that in time of the connection exception it should be trying to reconnect with a customized delay. For example, at first, it waits 1 second and then tries to connect, the second time it waits 2 seconds and then tries to reconnect, and so on until a max number of reties.
The current version will try to reconnect every 5 seconds, but can someone give me a hint on how I should implement it to work as I described?
Thanks
public void listen() {
myflux
.publishOn(myscheduler) // publish on my scheduler
.flatMap(this::notifyTestService) // call notify test service method
.transform(this::trackEventCompletionTime) // time event with field
.subscribe(); // subscribe event
}
private Mono<TestMono> notifyDeviceHealthService(TestEvent testEvent) {
return Mono.just(testEvent) //create a mono from event
.map(testEventTransformer) // transform the event
.flatMap(snsPublisher::publish) // publish the transformed event
.then(Mono.just(testEvent)); //return back the original event.
}
Maybe.concat(listOfMaybes).toList().toMaybe()
I have a shared object between chains of Completables
with concatWith
. In chain1 I updated some fields of the shared object but it seems in the second chain those updates are not visible and I got the old values for the updated fields. I expected as the concatWith
s are executed sequentially, those updates show be accessible.
My question is this is how the concatWith
operator works or I missed something?
Thanks
@akarnokd Thanks for your attention.
Here is an example
User user = new User();
conn.rxInsertUser()
.map(userId -> {
// setId returns void
user.setId(userId);
return user;})
.ignoreElement()
// inside the concatWith the user object has no id, same with the andThen method
.concatWith(user ... //)
.toSingleDefault(user)
// but here inside the map, the user object has the id field set
.map(user ... //)
As I commented inside the code, in the above code, inside the concatWith
(and same with andThen
) I can not see the updated user
instance, but inside the last map
I can see the updated user
I only want to know more about the behavior of the operations and how we can justify such a scenario?
Thanks
Initialize
method that only sets values for some class parameter, no database call, or else. The method returns Completable
. In such a situation which approach for returning a Completable
is recommended: 1- using return Completable.create(emitter -> {...})
, 2- return Completable.complete()
at the end.Hi,
In the following steam, it starts as Maybe and then I try to convert it to Single by flatMapSIngle
.
getNodeId(nodeName)
.flatMap(nodeId -> ....)
.flatMapSingle(oldData -> ...)
.....
My understanding is in such a case if the Maybe was empty, its downstream methods will not be executed. So although I have flatMapSingle
in continue, it will not be triggered.
But in action I'm getting java.util.NoSuchElementException
Could someone justify this behavior and give me the workaround please?
Thanks
I need some explanation in chaining Flowables
and Completables
.Let say we have the following method
public static Completable start(Flowable<Object> message_queue) {
return message_queue
.filter(message -> ....)
.flatMapMaybe(....)
.flatMapMaybe(....)
.map(....)
.flatMapMaybe(....)
.flatMapMaybe(....)
.flatMapSingle(....)
.flatMapCompletable(....);
}
In the above method I apply some transformations to the stream of items, finally I convert it to a Completable
. Let say the flatMapCompletable
insert the item into the database which returns a Completable
internally. When I subscribe to the method it's executing like a long-running stream that consumes messages from the source and inserts them into the database (which functionally is exactly what I need).
The part that I can not explain for myself is when we call the flatMapCompletable
at the end of the chain, why the flow doesn't stop after processing the first item of Flowable
which returns a Completable
? It seems it's returning one Completable
for each item in the flow. Is that so? In that case, as the method is returning a Completable
, when the method returns the completable that makes the flow stop and actually completed.
Thanks
In an application, we can have multiple different streams. For example, let say we have the following Completable:
Completable ccompletable1 = message_queue1
.flatMapSingle(....)
....
.flatMapCompletable(....);
Completable ccompletable2 = message_queue2
.flatMapSingle(....)
.....
.flatMapCompletable(....);
Which of the following options are more efficient?
// subscribing stream separately
completable1.subscribe();
completable2.subscribe();
Or
// chain them together
completable1.
.mergeWith(completable2)
.subscribe()
Generally, what is the best practice for that? For example, in a project, it's better to chain all streams and subscribe once or it's better to subscribe to them separately?
Hi, Supposing that we have the following Flowable
message_queue
.filter(message -> ....)
.flatMapMaybe(....)
.flatMapMaybe(....)
.map(....)
.flatMapMaybe(....)
.flatMapMaybe(....)
.flatMapSingle(....)
.flatMapCompletable(....);
I wanted to know is there a way to configure it such that if any error happened in processing the items in the entire flow, just the item should be ignored and resume to next. I need something like onErrorResumeNext()
without any argument, but it seems in onErrorResumeNext
we should set a new Flowable (publisher) to be substituted in time of the error. So, in summary, my questions are how we can just ignore the item in a time of error in that flow and also where we should call that operator to take care of all the flow, for example, if we add the error handling operator at the end of the flow (here after flatMapCompletable()
) does it handle errors if any error happened inside the first operator which is filter
? If yes, can we call onErrorComplete()
after the flatMapCompletable
to catch all errors and ignore the item?
Thanks
@CheckReturnValue
@NonNull
public final ParallelFlowable<T> runOn(@NonNull Scheduler scheduler, int prefetch) {
ObjectHelper.requireNonNull(scheduler, "scheduler");
ObjectHelper.verifyPositive(prefetch, "prefetch");
return RxJavaPlugins.onAssembly(new ParallelRunOn<T>(this, scheduler, prefetch));
}
runOn
, similar to observeOn
provides an async boundary where each side could go at its own pace. However, when the producer side is fast responding, requesting one by one would be a performance hog. Therefore, the operator has an internal buffer that gets filled and drained so that requesting more items happens less frequently. The prefetch
allows you to control the length of this buffer. When the computation after runOn
takes long time or is imbalanced, you may want to keep this value low to ensure rails get roughly equal chance to work on more items in parallel.
@return
but I am not convinced for when there are several exceptions, do you know of any example or project that documents the exceptions to be able to have it as a reference?
I am writing an undergraduate theses at Mid Sweden University which is investigating the difference in eye-gaze of programmers while reading imperative and reactive code. I would love some volunteers to perform the experiment. It is done online by solving code snippets of the two paradigms and takes about 15 minutes all up.
No video is recorded or sent away. All eye-gaze predictions is done client side in your own browser.
You need a computer in a well lit room with a web camera and you should be familiar with both Java and RxJava.
The experiment is hosted on the university server and all instructions are given there.
https://studenter.miun.se/~evth1400/EyeGazeStudy/?group=4
I am very grateful if anyone would like to partake.
fun throttledClick(
targetView: View,
completionMethod: () -> Unit,
throttleWindow: Long = DOUBLE_CLICK_THROTTLE_WINDOW
) {
RxView.clicks(targetView)
.throttleFirst(throttleWindow, MILLISECONDS)
.subscribe {
completionMethod.invoke()
}
}
Flowable.error
was not passed to onError
and Firebase reported the crash. This question is posted here, what is the cause of this problem?
this.disposableSubscriptionOnUpstream = Flowable.fromPublisher(upstream)
.onBackpressureBuffer(
1000
, () -> {
messagesDropped.incrementAndGet();
log.error("буфер для канала телеметрии {} переполнен"
, channelId);
}
, BackpressureOverflowStrategy.DROP_OLDEST)
.observeOn(this.scheduler)
.subscribeOn(this.scheduler)
.subscribe(this::onNext, this::onError,this::onComplete,this::onSubscribe);
Could not deliver value due to lack of requests
Flowable
is disposed . I feel like it is related to the groupBy
. any tips on on what might be wrong? Tips on how to debug ? Common areas people create Deadlock with Flowable
?/* 100 */ final io.reactivex.rxjava3.core.Flowable _inputFlowable = new com.twilio.raas.reactive.KuduFlowable( [109/1945]
/* 101 */ (com.twilio.kudu.sql.CalciteKuduTable) root.getRootSchema().getSubSchema("kudu").getTable("ReportCenter.UsageReportTransactions"),
/* 102 */ false,
/* 103 */ v0stashed,
/* 104 */ v2stashed,
/* 105 */ v1stashed,
/* 106 */ new org.apache.calcite.linq4j.function.Function1(){
/* 107 */ public Object apply(final Object abstractRow) {
/* 108 */ return new Object[] {
/* 109 */ ( (org.apache.kudu.client.RowResult) abstractRow).getInt(0),
/* 110 */ ( (org.apache.kudu.client.RowResult) abstractRow).getDecimal(1)};
/* 111 */ }
/* 112 */
/* 113 */ },
/* 114 */ new org.apache.calcite.linq4j.function.Predicate1(){
/* 115 */ public boolean apply(final Object abstractRow) {
/* 116 */ return true;
/* 117 */ }
/* 118 */
/* 119 */ }).subscribeOn(io.reactivex.rxjava3.schedulers.Schedulers.io(), false).groupBy(com.twilio.calcite.rxjava.rel.RxAggregate.lixSelector(new org.apache.calcite.linq4j.function.Function1(
) {
/* 120 */ public int apply(Object[] a0) {
/* 121 */ return org.apache.calcite.runtime.SqlFunctions.toInt(a0[0]);
/* 122 */ }
/* 123 */ public Object apply(Object a0) {
/* 124 */ return apply(
/* 125 */ (Object[]) a0);
/* 126 */ }
/* 127 */ }
/* 128 */ )).flatMap(com.twilio.calcite.rxjava.rel.RxAggregate.lixReduceGroup(com.twilio.calcite.rxjava.rel.RxAggregate.lixSeedSupplier(lambdaFactory.accumulatorInitializer()), com.twilio.calcit
e.rxjava.rel.RxAggregate.lixReducer(lambdaFactory.accumulatorAdder()), lambdaFactory.resultSelector(new org.apache.calcite.linq4j.function.Function2() {
/* 129 */ public Object[] apply(int key, Record2_0 acc) {
/* 130 */ return new Object[] {
/* 131 */ key,
/* 132 */ acc.f0,
/* 133 */ acc.f1};
/* 134 */ }
/* 135 */ public Object[] apply(Integer key, Record2_0 acc) {
/* 136 */ return apply(
/* 137 */ key.intValue(),
/* 138 */ acc);
/* 139 */ }
/* 140 */ public Object[] apply(Object key, Object acc) {
/* 141 */ return apply(
/* 142 */ (Integer) key,
/* 143 */ (Record2_0) acc);
/* 144 */ }
/* 145 */ }
/* 146 */ )));
/* 147 */ return _inputFlowable.map(com.twilio.calcite.rxjava.rel.RxCalc.calcMapFunction(lixMapper));