Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Lubna Hopkins
    @HopkinsLubna_twitter
    I will tell you but first think about my question...
    Lubna Hopkins
    @HopkinsLubna_twitter
    Can a brain multi-task?
    Yeah sure seldomly, but what happens when you try to do too many things at once, when you try to multi-task?!?! I know that no matter how intelligent you are, and no matter how MUCH YOU THINK you got your sh_t together, you are bound to falter and miss a few detailed major things when you MULTI-TASKS. So what do you think is going to happen when you WIRE A COMPUTER THAT WAY.
    Lubna Hopkins
    @HopkinsLubna_twitter
    It will definitely go haywire and creating a software like that will get you sued or worst, it can create the wrong data. Especially if you were using it for really important things like curing cancer. You can't possibly trust a multi-tasker software.
    Mark Elston
    @melston
    @mghildiy, ReactiveX is inherently multi-threaded. But it abstracts away much of the complexity of threading and provides a more 'declarative' approach to solving parallel tasks. IOW, you think more about what you want to do and much less about the details of how it is done. The underlying implementation handles the details of threads/thread pools/inter-thread communications/sequencing/etc.
    Soheil Pourbafrani
    @ahoora08

    Hi everybody,

    I'm seeking a solution for error handling of the time a connection exception happens. In my case, the connection has been established to RMQ:

    rabbitMQClient.rxStart()
                        .doOnComplete(() -> logger.info("RMQ Client connected successfully"))
                        .doOnError(error -> logger.error("RMQ connection error: ", error))
                        .retryWhen(throwableFlowable -> throwableFlowable.delay(5, TimeUnit.SECONDS))
                                            ....

    The above chain is of the type Completable. My desire logic is that in time of the connection exception it should be trying to reconnect with a customized delay. For example, at first, it waits 1 second and then tries to connect, the second time it waits 2 seconds and then tries to reconnect, and so on until a max number of reties.

    The current version will try to reconnect every 5 seconds, but can someone give me a hint on how I should implement it to work as I described?

    Thanks

    keenz
    @kindlychung
    how can i convert a list of maybes into a maybe of a list?
    Davide Pugliese
    @Deviad
    Hello everyone, does anyone know why Flux returns only one item in this case:
    https://stackoverflow.com/questions/65867938/flux-returning-only-one-value-instead-of-all-values ?
    Sagar Raj
    @SagarrajRaj_twitter
    Hello All,
    Is there a code smell in the snippet below, I need the same event till end to track the event completion time.
        public void listen() {
            myflux
                .publishOn(myscheduler) // publish on my scheduler
                .flatMap(this::notifyTestService) // call notify test service method
               .transform(this::trackEventCompletionTime) // time event with field
                .subscribe(); // subscribe event
        }
    
        private Mono<TestMono> notifyDeviceHealthService(TestEvent testEvent) {
            return Mono.just(testEvent)  //create a mono from event
                .map(testEventTransformer)  // transform the event
                .flatMap(snsPublisher::publish) // publish the transformed event
                .then(Mono.just(testEvent)); //return back the original event.
        }
    David Karnok
    @akarnokd
    @Deviad @SagarrajRaj_twitter I suggest asking Flux/Mono related questions on the Project Reactor gitter: https://gitter.im/reactor/reactor-core
    @kindlychung
    Maybe.concat(listOfMaybes).toList().toMaybe()
    Sagar Raj
    @SagarrajRaj_twitter
    Thanks @akarnokd , trying for help there too. But just posted here as well since the underlying concepts are likely the same :)
    keenz
    @kindlychung

    @kindlychung
    Maybe.concat(listOfMaybes).toList().toMaybe()

    Thanks!

    Soheil Pourbafrani
    @ahoora08

    I have a shared object between chains of Completables with concatWith. In chain1 I updated some fields of the shared object but it seems in the second chain those updates are not visible and I got the old values for the updated fields. I expected as the concatWiths are executed sequentially, those updates show be accessible.
    My question is this is how the concatWith operator works or I missed something?

    Thanks

    David Karnok
    @akarnokd
    @ahoora08 How are you accessing those fields? Please provide code that demonstrates your problem.
    Soheil Pourbafrani
    @ahoora08

    @akarnokd Thanks for your attention.
    Here is an example

    User user = new User();
    conn.rxInsertUser()
               .map(userId -> {
                                // setId returns void
                                user.setId(userId); 
                                return user;})
               .ignoreElement()
               // inside the concatWith the user object has no id, same with the andThen method
               .concatWith(user ... //)
               .toSingleDefault(user)
               // but here inside the map, the user object has the id field set
               .map(user ... //)

    As I commented inside the code, in the above code, inside the concatWith (and same with andThen) I can not see the updated user instance, but inside the last map I can see the updated user

    I only want to know more about the behavior of the operations and how we can justify such a scenario?

    Thanks

    David Karnok
    @akarnokd
    Please show the concatWith part.
    Dan O'Reilly
    @dano
    @ahoora08 odds are you need to wrap what you're passing to concatWith in Completable.defer(() -> ...)
    Soheil Pourbafrani
    @ahoora08
    @dano True, that was because the Completable was created before subscription (while the update is not applied yet).
    I needed to defer it to after subscription. Thanks
    @akarnokd I think you agree, too
    David Karnok
    @akarnokd
    That is very likely.
    Dan O'Reilly
    @dano
    @akarnokd I'm curious, does your comment here about RxJava having "to do atomics via way more allocations than optimal" still apply with RxJava3?
    David Karnok
    @akarnokd
    Yes. We still don't use the atomic field updaters.
    Dan O'Reilly
    @dano
    Thanks!
    Pratap Koritala
    @pratap_koritala_twitter
    What's the best way to use RxJava in a web application?
    Soheil Pourbafrani
    @ahoora08
    Suppose we have an Initialize method that only sets values for some class parameter, no database call, or else. The method returns Completable. In such a situation which approach for returning a Completable is recommended: 1- using return Completable.create(emitter -> {...}), 2- return Completable.complete() at the end.
    Thanks
    4 replies
    Soheil Pourbafrani
    @ahoora08

    Hi,
    In the following steam, it starts as Maybe and then I try to convert it to Single by flatMapSIngle.

    getNodeId(nodeName)
                        .flatMap(nodeId -> ....)
                        .flatMapSingle(oldData -> ...)
    .....

    My understanding is in such a case if the Maybe was empty, its downstream methods will not be executed. So although I have flatMapSingle in continue, it will not be triggered.
    But in action I'm getting java.util.NoSuchElementException

    Could someone justify this behavior and give me the workaround please?
    Thanks

    5 replies
    Davide Pugliese
    @Deviad
    Soheil Pourbafrani
    @ahoora08

    I need some explanation in chaining Flowables and Completables.Let say we have the following method

    public static Completable start(Flowable<Object> message_queue) {
    
            return message_queue
                    .filter(message -> ....)
                    .flatMapMaybe(....)
                    .flatMapMaybe(....)
                    .map(....)
                    .flatMapMaybe(....)
                    .flatMapMaybe(....)
                    .flatMapSingle(....)
                    .flatMapCompletable(....);
    }

    In the above method I apply some transformations to the stream of items, finally I convert it to a Completable. Let say the flatMapCompletable insert the item into the database which returns a Completable internally. When I subscribe to the method it's executing like a long-running stream that consumes messages from the source and inserts them into the database (which functionally is exactly what I need).
    The part that I can not explain for myself is when we call the flatMapCompletable at the end of the chain, why the flow doesn't stop after processing the first item of Flowable which returns a Completable? It seems it's returning one Completable for each item in the flow. Is that so? In that case, as the method is returning a Completable, when the method returns the completable that makes the flow stop and actually completed.

    Thanks

    4 replies
    Soheil Pourbafrani
    @ahoora08

    In an application, we can have multiple different streams. For example, let say we have the following Completable:

    Completable ccompletable1 = message_queue1
                    .flatMapSingle(....)
                    ....
                    .flatMapCompletable(....);
    Completable ccompletable2 = message_queue2
                    .flatMapSingle(....)
                    .....
                    .flatMapCompletable(....);

    Which of the following options are more efficient?

    // subscribing stream separately
    completable1.subscribe();
    completable2.subscribe();

    Or

    // chain them together
    completable1.
              .mergeWith(completable2)
              .subscribe()

    Generally, what is the best practice for that? For example, in a project, it's better to chain all streams and subscribe once or it's better to subscribe to them separately?

    Soheil Pourbafrani
    @ahoora08

    Hi, Supposing that we have the following Flowable

    message_queue
                    .filter(message -> ....)
                    .flatMapMaybe(....)
                    .flatMapMaybe(....)
                    .map(....)
                    .flatMapMaybe(....)
                    .flatMapMaybe(....)
                    .flatMapSingle(....)
                    .flatMapCompletable(....);

    I wanted to know is there a way to configure it such that if any error happened in processing the items in the entire flow, just the item should be ignored and resume to next. I need something like onErrorResumeNext() without any argument, but it seems in onErrorResumeNext we should set a new Flowable (publisher) to be substituted in time of the error. So, in summary, my questions are how we can just ignore the item in a time of error in that flow and also where we should call that operator to take care of all the flow, for example, if we add the error handling operator at the end of the flow (here after flatMapCompletable()) does it handle errors if any error happened inside the first operator which is filter? If yes, can we call onErrorComplete() after the flatMapCompletable to catch all errors and ignore the item?

    Thanks

    Duy Tran
    @DuyTran1998
    Hi, Can you help me explain the prefetch variable using for ?
        @CheckReturnValue
        @NonNull
        public final ParallelFlowable<T> runOn(@NonNull Scheduler scheduler, int prefetch) {
            ObjectHelper.requireNonNull(scheduler, "scheduler");
            ObjectHelper.verifyPositive(prefetch, "prefetch");
            return RxJavaPlugins.onAssembly(new ParallelRunOn<T>(this, scheduler, prefetch));
        }
    David Karnok
    @akarnokd
    runOn, similar to observeOn provides an async boundary where each side could go at its own pace. However, when the producer side is fast responding, requesting one by one would be a performance hog. Therefore, the operator has an internal buffer that gets filled and drained so that requesting more items happens less frequently. The prefetch allows you to control the length of this buffer. When the computation after runOn takes long time or is imbalanced, you may want to keep this value low to ensure rails get roughly equal chance to work on more items in parallel.
    Juan Manuel Fdez
    @Gonlo2
    Hi, I would like to document the exceptions returned by a Single in javadoc but I have doubts about how I could do it. I have seen that somewhere they add them as part of the @return but I am not convinced for when there are several exceptions, do you know of any example or project that documents the exceptions to be able to have it as a reference?
    EvaThil
    @EvaThil

    I am writing an undergraduate theses at Mid Sweden University which is investigating the difference in eye-gaze of programmers while reading imperative and reactive code. I would love some volunteers to perform the experiment. It is done online by solving code snippets of the two paradigms and takes about 15 minutes all up.

    No video is recorded or sent away. All eye-gaze predictions is done client side in your own browser.

    You need a computer in a well lit room with a web camera and you should be familiar with both Java and RxJava.

    The experiment is hosted on the university server and all instructions are given there.
    https://studenter.miun.se/~evth1400/EyeGazeStudy/?group=4

    I am very grateful if anyone would like to partake.

    RishabhDeep Singh
    @rishabhdeepsingh
    Currently we are using RxView to handle throttle clicks inside app and we have utility functions to subscribe on click listeners, we never unsubscribe from them. Is there an easy way to do so? or similar?
        fun throttledClick(
            targetView: View,
            completionMethod: () -> Unit,
            throttleWindow: Long = DOUBLE_CLICK_THROTTLE_WINDOW
        ) {
            RxView.clicks(targetView)
                .throttleFirst(throttleWindow, MILLISECONDS)
                .subscribe {
                    completionMethod.invoke()
                }
        }
    Juan Cuzmar
    @j1cs
    dude this chat is dead
    :S
    lize
    @ZacharyTech
    The exception thrown by Flowable.error was not passed to onError and Firebase reported the crash. This question is posted here, what is the cause of this problem?
    brightinnovator
    @brightinnovator

    I want to read 3 crore csv rows which is of 2GB csv file size and need to insert into MySQL via Java.

    Could someone please help me know the fastest and memory efficient way to avoid out of memory exception as well load in lesser time?

    Please kindly advise.

    Scorpio The Dark
    @scorpio.thedark_gitlab
    good day i have some flow like this.disposableSubscriptionOnUpstream = Flowable.fromPublisher(upstream) .onBackpressureBuffer( 1000 , () -> { messagesDropped.incrementAndGet(); log.error("буфер для канала телеметрии {} переполнен" , channelId); } , BackpressureOverflowStrategy.DROP_OLDEST) .observeOn(this.scheduler) .subscribeOn(this.scheduler) .subscribe(this::onNext, this::onError,this::onComplete,this::onSubscribe);
    and recive this error Could not deliver value due to lack of requests
    when this::onNext some how takes too long time , can you tell me why this error happans, even when .onBackpressureBuffer is set
    David Karnok
    @akarnokd
    I don't see how this could happen. What is the exact stacktrace and what is upstream?
    1 reply
    Scorpio The Dark
    @scorpio.thedark_gitlab
    upstream is BehaviorProcessor. this is some flow from mqtt subscription. i have about 1K such "channals" and some of them (about ten or may be twenty) have very intense input rate , something about 4 or 6 times rep second.
    David Karnok
    @akarnokd
    The stacktrace indicates it fails to emit in a BehaviorProcessor. onBackpressureBuffer requests the MAX amount thus it should not cause this. Is upstream exactly a BehaviorProcessor or do you have some other operators attached to it before the fromPublisher(upstream)?
    brightinnovator
    @brightinnovator
    Can someone help me with latest working microservices demo or sample application to deploy in AWS and test? Please kindly help me..Need to learn it quickly as possible..
    Scott Reynolds
    @sdreynolds
    I have been working on a Calcite Adapter to JavaRx and I have made a mistake that I cannot reason about. The generated code (which is below) doesn't run until the root Flowable is disposed . I feel like it is related to the groupBy. any tips on on what might be wrong? Tips on how to debug ? Common areas people create Deadlock with Flowable?
    /* 100 */   final io.reactivex.rxjava3.core.Flowable _inputFlowable = new com.twilio.raas.reactive.KuduFlowable(                                                                          [109/1945]
    /* 101 */     (com.twilio.kudu.sql.CalciteKuduTable) root.getRootSchema().getSubSchema("kudu").getTable("ReportCenter.UsageReportTransactions"),
    /* 102 */     false,
    /* 103 */     v0stashed,
    /* 104 */     v2stashed,
    /* 105 */     v1stashed,
    /* 106 */     new org.apache.calcite.linq4j.function.Function1(){
    /* 107 */       public Object apply(final Object abstractRow) {
    /* 108 */         return new Object[] {
    /* 109 */ (            (org.apache.kudu.client.RowResult) abstractRow).getInt(0),
    /* 110 */ (            (org.apache.kudu.client.RowResult) abstractRow).getDecimal(1)};
    /* 111 */       }
    /* 112 */
    /* 113 */     },
    /* 114 */     new org.apache.calcite.linq4j.function.Predicate1(){
    /* 115 */       public boolean apply(final Object abstractRow) {
    /* 116 */         return true;
    /* 117 */       }
    /* 118 */
    /* 119 */     }).subscribeOn(io.reactivex.rxjava3.schedulers.Schedulers.io(), false).groupBy(com.twilio.calcite.rxjava.rel.RxAggregate.lixSelector(new org.apache.calcite.linq4j.function.Function1(
    ) {
    /* 120 */     public int apply(Object[] a0) {
    /* 121 */       return org.apache.calcite.runtime.SqlFunctions.toInt(a0[0]);
    /* 122 */     }
    /* 123 */     public Object apply(Object a0) {
    /* 124 */       return apply(
    /* 125 */         (Object[]) a0);
    /* 126 */     }
    /* 127 */   }
    /* 128 */   )).flatMap(com.twilio.calcite.rxjava.rel.RxAggregate.lixReduceGroup(com.twilio.calcite.rxjava.rel.RxAggregate.lixSeedSupplier(lambdaFactory.accumulatorInitializer()), com.twilio.calcit
    e.rxjava.rel.RxAggregate.lixReducer(lambdaFactory.accumulatorAdder()), lambdaFactory.resultSelector(new org.apache.calcite.linq4j.function.Function2() {
    /* 129 */     public Object[] apply(int key, Record2_0 acc) {
    /* 130 */       return new Object[] {
    /* 131 */           key,
    /* 132 */           acc.f0,
    /* 133 */           acc.f1};
    /* 134 */     }
    /* 135 */     public Object[] apply(Integer key, Record2_0 acc) {
    /* 136 */       return apply(
    /* 137 */         key.intValue(),
    /* 138 */         acc);
    /* 139 */     }
    /* 140 */     public Object[] apply(Object key, Object acc) {
    /* 141 */       return apply(
    /* 142 */         (Integer) key,
    /* 143 */         (Record2_0) acc);
    /* 144 */     }
    /* 145 */   }
    /* 146 */   )));
    /* 147 */   return _inputFlowable.map(com.twilio.calcite.rxjava.rel.RxCalc.calcMapFunction(lixMapper));
    4 replies
    Yaz
    @yazalulloa

    Hello I'm trying to page a database using Flowable but in the last page it doesn't signal on Complete and the Flowable never ends

       public static <T> Flowable<T> flowable(CompletionStage<MappedAsyncPagingIterable<T>> stage) {
            return Single.fromCompletionStage(stage)
                    .toFlowable()
                    .flatMap(iterable -> {
                        final Flowable<T> fromIterable = Flowable.fromIterable(iterable.currentPage());
                        if (iterable.hasMorePages()) {
                            final Flowable<T> flowable = flowable(iterable.fetchNextPage());
                            return Flowable.concat(fromIterable, flowable);
                        } else {
                            return fromIterable;
                        }
                    });
        }

    I tried when iterable.hasMorePages() == false to concat the fromIterable Flowable with a Flowable.empty(), but it didn't work, onComplete never get invoked

    Yaz
    @yazalulloa
    The page only fails with one item, if it has more than 1 it completes succesfully
    Scott Reynolds
    @sdreynolds
    This is similar to what I experienced in my Pasted code above. My generated code works if there is several items to return. It fails when there is one