Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Juan Cuzmar
    @j1cs
    thank you
    Juan Cuzmar
    @j1cs
    @akarnokd with Flowable could be much more cleaner? what do you think?
    David Karnok
    @akarnokd
    Juan Cuzmar
    @j1cs
    hmmm
    just one
    i see
    mghildiy
    @mghildiy
    Hi...
    A paragraph is:
    An advantage of this approach is that when you have a bunch of tasks that are not dependent on each other, you can start them all at the same time rather than waiting for each one to finish before starting the next one — that way, your entire bundle of tasks only takes as long to complete as the longest task in the bundle.
    But we can achieve same with multhtreaded programming...isn't?
    so how are two approaches different
    Renso Contreras
    @rensocontreras
    Hi ,I'm using Firebase to get objects a real time, I would like to execute more observables sequentially (first in first out), thanks you! https://stackoverflow.com/questions/65631933/rxjava-execute-more-observables-sequentially
    Renso Contreras
    @rensocontreras
    please
    Lubna Hopkins
    @HopkinsLubna_twitter
    I will tell you but first think about my question...
    Lubna Hopkins
    @HopkinsLubna_twitter
    Can a brain multi-task?
    Yeah sure seldomly, but what happens when you try to do too many things at once, when you try to multi-task?!?! I know that no matter how intelligent you are, and no matter how MUCH YOU THINK you got your sh_t together, you are bound to falter and miss a few detailed major things when you MULTI-TASKS. So what do you think is going to happen when you WIRE A COMPUTER THAT WAY.
    Lubna Hopkins
    @HopkinsLubna_twitter
    It will definitely go haywire and creating a software like that will get you sued or worst, it can create the wrong data. Especially if you were using it for really important things like curing cancer. You can't possibly trust a multi-tasker software.
    Mark Elston
    @melston
    @mghildiy, ReactiveX is inherently multi-threaded. But it abstracts away much of the complexity of threading and provides a more 'declarative' approach to solving parallel tasks. IOW, you think more about what you want to do and much less about the details of how it is done. The underlying implementation handles the details of threads/thread pools/inter-thread communications/sequencing/etc.
    Soheil Pourbafrani
    @ahoora08

    Hi everybody,

    I'm seeking a solution for error handling of the time a connection exception happens. In my case, the connection has been established to RMQ:

    rabbitMQClient.rxStart()
                        .doOnComplete(() -> logger.info("RMQ Client connected successfully"))
                        .doOnError(error -> logger.error("RMQ connection error: ", error))
                        .retryWhen(throwableFlowable -> throwableFlowable.delay(5, TimeUnit.SECONDS))
                                            ....

    The above chain is of the type Completable. My desire logic is that in time of the connection exception it should be trying to reconnect with a customized delay. For example, at first, it waits 1 second and then tries to connect, the second time it waits 2 seconds and then tries to reconnect, and so on until a max number of reties.

    The current version will try to reconnect every 5 seconds, but can someone give me a hint on how I should implement it to work as I described?

    Thanks

    keenz
    @kindlychung
    how can i convert a list of maybes into a maybe of a list?
    Davide Pugliese
    @Deviad
    Hello everyone, does anyone know why Flux returns only one item in this case:
    https://stackoverflow.com/questions/65867938/flux-returning-only-one-value-instead-of-all-values ?
    Sagar Raj
    @SagarrajRaj_twitter
    Hello All,
    Is there a code smell in the snippet below, I need the same event till end to track the event completion time.
        public void listen() {
            myflux
                .publishOn(myscheduler) // publish on my scheduler
                .flatMap(this::notifyTestService) // call notify test service method
               .transform(this::trackEventCompletionTime) // time event with field
                .subscribe(); // subscribe event
        }
    
        private Mono<TestMono> notifyDeviceHealthService(TestEvent testEvent) {
            return Mono.just(testEvent)  //create a mono from event
                .map(testEventTransformer)  // transform the event
                .flatMap(snsPublisher::publish) // publish the transformed event
                .then(Mono.just(testEvent)); //return back the original event.
        }
    David Karnok
    @akarnokd
    @Deviad @SagarrajRaj_twitter I suggest asking Flux/Mono related questions on the Project Reactor gitter: https://gitter.im/reactor/reactor-core
    @kindlychung
    Maybe.concat(listOfMaybes).toList().toMaybe()
    Sagar Raj
    @SagarrajRaj_twitter
    Thanks @akarnokd , trying for help there too. But just posted here as well since the underlying concepts are likely the same :)
    keenz
    @kindlychung

    @kindlychung
    Maybe.concat(listOfMaybes).toList().toMaybe()

    Thanks!

    Soheil Pourbafrani
    @ahoora08

    I have a shared object between chains of Completables with concatWith. In chain1 I updated some fields of the shared object but it seems in the second chain those updates are not visible and I got the old values for the updated fields. I expected as the concatWiths are executed sequentially, those updates show be accessible.
    My question is this is how the concatWith operator works or I missed something?

    Thanks

    David Karnok
    @akarnokd
    @ahoora08 How are you accessing those fields? Please provide code that demonstrates your problem.
    Soheil Pourbafrani
    @ahoora08

    @akarnokd Thanks for your attention.
    Here is an example

    User user = new User();
    conn.rxInsertUser()
               .map(userId -> {
                                // setId returns void
                                user.setId(userId); 
                                return user;})
               .ignoreElement()
               // inside the concatWith the user object has no id, same with the andThen method
               .concatWith(user ... //)
               .toSingleDefault(user)
               // but here inside the map, the user object has the id field set
               .map(user ... //)

    As I commented inside the code, in the above code, inside the concatWith (and same with andThen) I can not see the updated user instance, but inside the last map I can see the updated user

    I only want to know more about the behavior of the operations and how we can justify such a scenario?

    Thanks

    David Karnok
    @akarnokd
    Please show the concatWith part.
    Dan O'Reilly
    @dano
    @ahoora08 odds are you need to wrap what you're passing to concatWith in Completable.defer(() -> ...)
    Soheil Pourbafrani
    @ahoora08
    @dano True, that was because the Completable was created before subscription (while the update is not applied yet).
    I needed to defer it to after subscription. Thanks
    @akarnokd I think you agree, too
    David Karnok
    @akarnokd
    That is very likely.
    Dan O'Reilly
    @dano
    @akarnokd I'm curious, does your comment here about RxJava having "to do atomics via way more allocations than optimal" still apply with RxJava3?
    David Karnok
    @akarnokd
    Yes. We still don't use the atomic field updaters.
    Dan O'Reilly
    @dano
    Thanks!
    Pratap Koritala
    @pratap_koritala_twitter
    What's the best way to use RxJava in a web application?
    Soheil Pourbafrani
    @ahoora08
    Suppose we have an Initialize method that only sets values for some class parameter, no database call, or else. The method returns Completable. In such a situation which approach for returning a Completable is recommended: 1- using return Completable.create(emitter -> {...}), 2- return Completable.complete() at the end.
    Thanks
    4 replies
    Soheil Pourbafrani
    @ahoora08

    Hi,
    In the following steam, it starts as Maybe and then I try to convert it to Single by flatMapSIngle.

    getNodeId(nodeName)
                        .flatMap(nodeId -> ....)
                        .flatMapSingle(oldData -> ...)
    .....

    My understanding is in such a case if the Maybe was empty, its downstream methods will not be executed. So although I have flatMapSingle in continue, it will not be triggered.
    But in action I'm getting java.util.NoSuchElementException

    Could someone justify this behavior and give me the workaround please?
    Thanks

    5 replies
    Davide Pugliese
    @Deviad
    Soheil Pourbafrani
    @ahoora08

    I need some explanation in chaining Flowables and Completables.Let say we have the following method

    public static Completable start(Flowable<Object> message_queue) {
    
            return message_queue
                    .filter(message -> ....)
                    .flatMapMaybe(....)
                    .flatMapMaybe(....)
                    .map(....)
                    .flatMapMaybe(....)
                    .flatMapMaybe(....)
                    .flatMapSingle(....)
                    .flatMapCompletable(....);
    }

    In the above method I apply some transformations to the stream of items, finally I convert it to a Completable. Let say the flatMapCompletable insert the item into the database which returns a Completable internally. When I subscribe to the method it's executing like a long-running stream that consumes messages from the source and inserts them into the database (which functionally is exactly what I need).
    The part that I can not explain for myself is when we call the flatMapCompletable at the end of the chain, why the flow doesn't stop after processing the first item of Flowable which returns a Completable? It seems it's returning one Completable for each item in the flow. Is that so? In that case, as the method is returning a Completable, when the method returns the completable that makes the flow stop and actually completed.

    Thanks

    4 replies
    Soheil Pourbafrani
    @ahoora08

    In an application, we can have multiple different streams. For example, let say we have the following Completable:

    Completable ccompletable1 = message_queue1
                    .flatMapSingle(....)
                    ....
                    .flatMapCompletable(....);
    Completable ccompletable2 = message_queue2
                    .flatMapSingle(....)
                    .....
                    .flatMapCompletable(....);

    Which of the following options are more efficient?

    // subscribing stream separately
    completable1.subscribe();
    completable2.subscribe();

    Or

    // chain them together
    completable1.
              .mergeWith(completable2)
              .subscribe()

    Generally, what is the best practice for that? For example, in a project, it's better to chain all streams and subscribe once or it's better to subscribe to them separately?

    Soheil Pourbafrani
    @ahoora08

    Hi, Supposing that we have the following Flowable

    message_queue
                    .filter(message -> ....)
                    .flatMapMaybe(....)
                    .flatMapMaybe(....)
                    .map(....)
                    .flatMapMaybe(....)
                    .flatMapMaybe(....)
                    .flatMapSingle(....)
                    .flatMapCompletable(....);

    I wanted to know is there a way to configure it such that if any error happened in processing the items in the entire flow, just the item should be ignored and resume to next. I need something like onErrorResumeNext() without any argument, but it seems in onErrorResumeNext we should set a new Flowable (publisher) to be substituted in time of the error. So, in summary, my questions are how we can just ignore the item in a time of error in that flow and also where we should call that operator to take care of all the flow, for example, if we add the error handling operator at the end of the flow (here after flatMapCompletable()) does it handle errors if any error happened inside the first operator which is filter? If yes, can we call onErrorComplete() after the flatMapCompletable to catch all errors and ignore the item?

    Thanks

    DuyTran1998
    @DuyTran1998
    Hi, Can you help me explain the prefetch variable using for ?
        @CheckReturnValue
        @NonNull
        public final ParallelFlowable<T> runOn(@NonNull Scheduler scheduler, int prefetch) {
            ObjectHelper.requireNonNull(scheduler, "scheduler");
            ObjectHelper.verifyPositive(prefetch, "prefetch");
            return RxJavaPlugins.onAssembly(new ParallelRunOn<T>(this, scheduler, prefetch));
        }
    David Karnok
    @akarnokd
    runOn, similar to observeOn provides an async boundary where each side could go at its own pace. However, when the producer side is fast responding, requesting one by one would be a performance hog. Therefore, the operator has an internal buffer that gets filled and drained so that requesting more items happens less frequently. The prefetch allows you to control the length of this buffer. When the computation after runOn takes long time or is imbalanced, you may want to keep this value low to ensure rails get roughly equal chance to work on more items in parallel.
    Juan Manuel Fdez
    @Gonlo2
    Hi, I would like to document the exceptions returned by a Single in javadoc but I have doubts about how I could do it. I have seen that somewhere they add them as part of the @return but I am not convinced for when there are several exceptions, do you know of any example or project that documents the exceptions to be able to have it as a reference?
    EvaThil
    @EvaThil

    I am writing an undergraduate theses at Mid Sweden University which is investigating the difference in eye-gaze of programmers while reading imperative and reactive code. I would love some volunteers to perform the experiment. It is done online by solving code snippets of the two paradigms and takes about 15 minutes all up.

    No video is recorded or sent away. All eye-gaze predictions is done client side in your own browser.

    You need a computer in a well lit room with a web camera and you should be familiar with both Java and RxJava.

    The experiment is hosted on the university server and all instructions are given there.
    https://studenter.miun.se/~evth1400/EyeGazeStudy/?group=4

    I am very grateful if anyone would like to partake.

    RishabhDeep Singh
    @rishabhdeepsingh
    Currently we are using RxView to handle throttle clicks inside app and we have utility functions to subscribe on click listeners, we never unsubscribe from them. Is there an easy way to do so? or similar?
        fun throttledClick(
            targetView: View,
            completionMethod: () -> Unit,
            throttleWindow: Long = DOUBLE_CLICK_THROTTLE_WINDOW
        ) {
            RxView.clicks(targetView)
                .throttleFirst(throttleWindow, MILLISECONDS)
                .subscribe {
                    completionMethod.invoke()
                }
        }
    Juan Cuzmar
    @j1cs
    dude this chat is dead
    :S