Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Sagar Raj
    @SagarrajRaj_twitter
    Hi, I am looking recommendation on (groupBy & subscribe) vs (flux multicast and filter) to take different action on different event types on the flux.
    https://stackoverflow.com/questions/61814106/choice-between-groupby-and-filter-with-infinite-streams-in-reactor
    Sagar Raj
    @SagarrajRaj_twitter
    I am starting with reactive development and I believe the approach is share the flux and filter them in each of the pipelines. I understand that the thread count would be lower, but would this not increase the CPU as computations are lot higher. (I would use a switch case in the regular programming model)
    Soheil Pourbafrani
    @ahoora08

    Hi everybody,
    In some cases, I need to apply a filter operation on the mainstream which afterward I need to process both results branched, not only the true one. So I can avoid filtering one stream twice, once to take the true branch and the other to take the false branch.
    Do we have any such operator in RxJava2?

    Thanks

    1 reply
    Soheil Pourbafrani
    @ahoora08

    Hi, I have two unrelated Maybes that I need to chain in a single stream (to keep the order of execution). The current solution is:

    Redis.rxHget(hashName, "value")
                        .doOnSuccess(to do in success case ...)
                        .flatMap(response -> Redis.rxHset(args)
                        .subscribe()

    The issue with the above approach is that if the first Maybe resulted in empty, the second one will not be triggered. So I was wondering is there any standard workaround for such a scenario, other than using the toSingle(default value) method after the first maybe?

    Thanks

    3 replies
    Volnei Munhoz
    @volnei_gitlab
    Hi guys! I need your help again.
    Volnei Munhoz
    @volnei_gitlab
    Im working with microservices and need to combine multiple results to a single VO. For example, I have a Product stream as base and need to combine the Product, Pricing and Inventory in a single result. How could I do this? Can I do calls to pricing and inventory in parallel way? Thanks. I know that this way is called Aggregator Pattern, but I have no idea how to do this in RxJava
    Soheil Pourbafrani
    @ahoora08

    Hi everybody,

    Suppose that we have a list of items that we need to do the same action on each.
    Regarding the performance optimization, I want to know is there any differences between the following approaches in the RxJava:

    Flowable.fromIterable(itemList)
    .flatMap(item -> client.rxQuery(item))
    .map(...)
    ....
    .subscribe()

    Or

    for (Item item : itemList){
        .client.rxQuery(item)
        .map(...)
        .subscribe()
    }

    In the first approach, we create a flowable of the list's items and then apply the logic but in the second one, we use a for loop and will have an individual subscribe method call for each item.

    Definitely, the first approach fits better into the RxJava pattern, but technically speaking, having an individual subscribe call as in the second approach will bring us more overloads?

    Thanks

    Dmitry Saviuk
    @Dimezis
    Is it a bug or feature that TestScheduler doesn't use RxJavaPlugins.onSchedule hook?
    JokerSun
    @JokerSun
    Hello everyone, I have a question that I want to keep just one object in the timed buffer by some filter strategy such as max score, is there any solution?
    For example that there is a source which publish object in the random time, and I create a buffer to collect that in per-second and when better score object appears then replace the previous one.
    Yannick Lecaillez
    @ylecaillez

    Hello ! Question regarding Flowable.using(): Flowable.using() is cancel() upstream on onComplete() and onError() (see: https://github.com/ReactiveX/RxJava/blob/79f8e6dde6446b1aa33c146eaedbb958086daf56/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableUsing.java#L146). I was wondering whether this could be considered a violation of 2.3 (see: https://github.com/reactive-streams/reactive-streams-jvm#2-subscriber-code)

    Subscriber.onComplete() and Subscriber.onError(Throwable t) MUST NOT call any methods on the Subscription or the Publisher.

    Yannick Lecaillez
    @ylecaillez

    Still related to cancel()ation, with adoOnCancel(action), it seems a late cancel() received after a onComplete() or onError() would trigger the action while 2.4 mentions:

    Subscriber.onComplete() and Subscriber.onError(Throwable t) MUST consider the Subscription cancelled after having received the signal.

    And 3.7 mentions:

    After the Subscription is cancelled, additional Subscription.cancel() MUST be NOPs.

    Thanks in advance for your lights !
    David Karnok
    @akarnokd
    The upstream.cancel certainly look unnecessary. I'll have to check why those ended up in there.
    The doOnCancel is somewhat deliberate. It will execute the lambda upon cancellation independent if the sequence terminates or not.
    David Karnok
    @akarnokd
    The upstream.cancel was a remnant from an early transition to Reactive Streams apparently and has no unit tests associated with the behavior. The side-effect is that nested usings would trigger the outer's cancel path before its onComplete chain has finished, releasing the outer source while the inner is still in the completion phase.
    David Karnok
    @akarnokd
    Posted ReactiveX/RxJava#7121 to fix using.
    Yannick Lecaillez
    @ylecaillez
    Thanks ! Isn't the doOnCancel() controversial ? By executing the action I don't think we can consider it behaves like a no-op ?
    Yannick Lecaillez
    @ylecaillez
    Maybe 2.4 and 3.7 are not explicit enough. Maybe a Subscription considered as being canceled after a onComplete/onError is not exactly the same thing than a Subscription being canceled explicitly as a result of cancel() invocation ?
    David Karnok
    @akarnokd
    In my reading "considered as being cancelled" only means the Publisher should (eventually) stop signaling the Subscriber, but it doesn't mean or imply that all side effects should cease to happen as well. Consequently, running the side effect in doOnCancel is the discretion of the implementation and beyond the scope of the spec.
    Yannick Lecaillez
    @ylecaillez
    Ah right, that makes sense. Again, thank you
    (and thanks for your amazing work)
    Cristian Achille
    @blackdev1l
    Hello everyone, i'm trying to do something (i think) simple but i can't grasp how to do this simple task. I'm trying to parallelize a series of operation done after a simple doOnNext and then get back to the flow with results of the single operation, i write succesfully a zip but i don't know how to parallelize the ooperation inside of it, can i accomplish the samething with a flat map? or is there any better way to do it?
         Flowable.just(string)
                    .doOnNext(this::save)
                    .map(val -> Flowable.zip(async1(val), async2(val), async3(val), this::collector)
                            .parallel().runOn(Schedulers.computation()))
                    .doOnComplete(() -> sendAck(channel, tag))
                    .subscribe(System.out::println);
    Cristian Achille
    @blackdev1l
    i'm in the exact same situation of https://stackoverflow.com/questions/49027355/rxjava-2-zip-operation-in-different-threads/49027583#49027583 but the answer doesn't work for me and i can't find any resource which works
    Jasper Timmer
    @JJWTimmer
    Is it possible to stream events (like numbers, SSE style?) to the client as long as it's connected and dispose when it disconnects?
    Juan Cuzmar
    @glats
    hello everyone.
    i am learning rxjava and would like to see the possibility if someone could review my code and improve it.
    specifically i want to see how to program nested observables
    http://ix.io/2Hly
    David Karnok
    @akarnokd

    @blackdev1l How are those async methods implemented? map and parallel won't work in that setup and when you find yourself trying to map to some Flowable, that's an indication you most likely need flatMap.

    Flowable.just(string)
    .doOnNext(this::save)
    .flatMap(val -> Flowable.zip(async1(val), async2(val), async3(val), this::collector))
    .doOnComplete(() -> sendAck(channel, tag))
    .subscribe(System.out::println);

    For this to go parallel with the asyncs, they could look something like this:

    public Flowable<T> async1(String val) {
        return Flowable.fromCallable(() -> process(val)).subscribeOn(Schedulers.computation());
    }
    @JJWTimmer depends on how that event source looks like. Is it generated imperatively? -> create(). Is it an Iterable, or Stream? fromXXX.
    David Karnok
    @akarnokd
    @glats Looks about right, but note that doOnError only peeks at the error and does not make it a non-error; for those you would have to pick a proper error handler operator: onErrorComplete, onErrorResumeWith, etc.
    Juan Cuzmar
    @glats
    thank you
    Juan Cuzmar
    @glats
    @akarnokd with Flowable could be much more cleaner? what do you think?
    David Karnok
    @akarnokd
    Juan Cuzmar
    @glats
    hmmm
    just one
    i see
    mghildiy
    @mghildiy
    Hi...
    A paragraph is:
    An advantage of this approach is that when you have a bunch of tasks that are not dependent on each other, you can start them all at the same time rather than waiting for each one to finish before starting the next one — that way, your entire bundle of tasks only takes as long to complete as the longest task in the bundle.
    But we can achieve same with multhtreaded programming...isn't?
    so how are two approaches different
    Renso Contreras
    @rensocontreras
    Hi ,I'm using Firebase to get objects a real time, I would like to execute more observables sequentially (first in first out), thanks you! https://stackoverflow.com/questions/65631933/rxjava-execute-more-observables-sequentially
    Renso Contreras
    @rensocontreras
    please
    Lubna Hopkins
    @HopkinsLubna_twitter
    I will tell you but first think about my question...
    Lubna Hopkins
    @HopkinsLubna_twitter
    Can a brain multi-task?
    Yeah sure seldomly, but what happens when you try to do too many things at once, when you try to multi-task?!?! I know that no matter how intelligent you are, and no matter how MUCH YOU THINK you got your sh_t together, you are bound to falter and miss a few detailed major things when you MULTI-TASKS. So what do you think is going to happen when you WIRE A COMPUTER THAT WAY.
    Lubna Hopkins
    @HopkinsLubna_twitter
    It will definitely go haywire and creating a software like that will get you sued or worst, it can create the wrong data. Especially if you were using it for really important things like curing cancer. You can't possibly trust a multi-tasker software.
    Mark Elston
    @melston
    @mghildiy, ReactiveX is inherently multi-threaded. But it abstracts away much of the complexity of threading and provides a more 'declarative' approach to solving parallel tasks. IOW, you think more about what you want to do and much less about the details of how it is done. The underlying implementation handles the details of threads/thread pools/inter-thread communications/sequencing/etc.
    Soheil Pourbafrani
    @ahoora08

    Hi everybody,

    I'm seeking a solution for error handling of the time a connection exception happens. In my case, the connection has been established to RMQ:

    rabbitMQClient.rxStart()
                        .doOnComplete(() -> logger.info("RMQ Client connected successfully"))
                        .doOnError(error -> logger.error("RMQ connection error: ", error))
                        .retryWhen(throwableFlowable -> throwableFlowable.delay(5, TimeUnit.SECONDS))
                                            ....

    The above chain is of the type Completable. My desire logic is that in time of the connection exception it should be trying to reconnect with a customized delay. For example, at first, it waits 1 second and then tries to connect, the second time it waits 2 seconds and then tries to reconnect, and so on until a max number of reties.

    The current version will try to reconnect every 5 seconds, but can someone give me a hint on how I should implement it to work as I described?

    Thanks