RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Hi everybody,
In some cases, I need to apply a filter operation on the mainstream which afterward I need to process both results branched, not only the true one. So I can avoid filtering one stream twice, once to take the true
branch and the other to take the false
branch.
Do we have any such operator in RxJava2?
Thanks
Hi, I have two unrelated Maybes
that I need to chain in a single stream (to keep the order of execution). The current solution is:
Redis.rxHget(hashName, "value")
.doOnSuccess(to do in success case ...)
.flatMap(response -> Redis.rxHset(args)
.subscribe()
The issue with the above approach is that if the first Maybe resulted in empty, the second one will not be triggered. So I was wondering is there any standard workaround for such a scenario, other than using the toSingle(default value)
method after the first maybe?
Thanks
Hi everybody,
Suppose that we have a list of items that we need to do the same action on each.
Regarding the performance optimization, I want to know is there any differences between the following approaches in the RxJava:
Flowable.fromIterable(itemList)
.flatMap(item -> client.rxQuery(item))
.map(...)
....
.subscribe()
Or
for (Item item : itemList){
.client.rxQuery(item)
.map(...)
.subscribe()
}
In the first approach, we create a flowable of the list's items and then apply the logic but in the second one, we use a for
loop and will have an individual subscribe
method call for each item.
Definitely, the first approach fits better into the RxJava pattern, but technically speaking, having an individual subscribe
call as in the second approach will bring us more overloads?
Thanks
Hello ! Question regarding Flowable.using(): Flowable.using() is cancel() upstream on onComplete() and onError() (see: https://github.com/ReactiveX/RxJava/blob/79f8e6dde6446b1aa33c146eaedbb958086daf56/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableUsing.java#L146). I was wondering whether this could be considered a violation of 2.3 (see: https://github.com/reactive-streams/reactive-streams-jvm#2-subscriber-code)
Subscriber.onComplete() and Subscriber.onError(Throwable t) MUST NOT call any methods on the Subscription or the Publisher.
Still related to cancel()ation, with adoOnCancel(action)
, it seems a late cancel()
received after a onComplete()
or onError()
would trigger the action
while 2.4 mentions:
Subscriber.onComplete() and Subscriber.onError(Throwable t) MUST consider the Subscription cancelled after having received the signal.
And 3.7 mentions:
After the Subscription is cancelled, additional Subscription.cancel() MUST be NOPs.
doOnCancel
is somewhat deliberate. It will execute the lambda upon cancellation independent if the sequence terminates or not.
upstream.cancel
was a remnant from an early transition to Reactive Streams apparently and has no unit tests associated with the behavior. The side-effect is that nested usings would trigger the outer's cancel path before its onComplete chain has finished, releasing the outer source while the inner is still in the completion phase.
Flowable.just(string)
.doOnNext(this::save)
.map(val -> Flowable.zip(async1(val), async2(val), async3(val), this::collector)
.parallel().runOn(Schedulers.computation()))
.doOnComplete(() -> sendAck(channel, tag))
.subscribe(System.out::println);
@blackdev1l How are those async methods implemented? map
and parallel
won't work in that setup and when you find yourself trying to map
to some Flowable
, that's an indication you most likely need flatMap
.
Flowable.just(string)
.doOnNext(this::save)
.flatMap(val -> Flowable.zip(async1(val), async2(val), async3(val), this::collector))
.doOnComplete(() -> sendAck(channel, tag))
.subscribe(System.out::println);
For this to go parallel with the asyncs, they could look something like this:
public Flowable<T> async1(String val) {
return Flowable.fromCallable(() -> process(val)).subscribeOn(Schedulers.computation());
}
create()
. Is it an Iterable, or Stream? fromXXX
.
Hi everybody,
I'm seeking a solution for error handling of the time a connection exception happens. In my case, the connection has been established to RMQ:
rabbitMQClient.rxStart()
.doOnComplete(() -> logger.info("RMQ Client connected successfully"))
.doOnError(error -> logger.error("RMQ connection error: ", error))
.retryWhen(throwableFlowable -> throwableFlowable.delay(5, TimeUnit.SECONDS))
....
The above chain is of the type Completable. My desire logic is that in time of the connection exception it should be trying to reconnect with a customized delay. For example, at first, it waits 1 second and then tries to connect, the second time it waits 2 seconds and then tries to reconnect, and so on until a max number of reties.
The current version will try to reconnect every 5 seconds, but can someone give me a hint on how I should implement it to work as I described?
Thanks