Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info

    I am writing an undergraduate theses at Mid Sweden University which is investigating the difference in eye-gaze of programmers while reading imperative and reactive code. I would love some volunteers to perform the experiment. It is done online by solving code snippets of the two paradigms and takes about 15 minutes all up.

    No video is recorded or sent away. All eye-gaze predictions is done client side in your own browser.

    You need a computer in a well lit room with a web camera and you should be familiar with both Java and RxJava.

    The experiment is hosted on the university server and all instructions are given there.

    I am very grateful if anyone would like to partake.

    RishabhDeep Singh
    Currently we are using RxView to handle throttle clicks inside app and we have utility functions to subscribe on click listeners, we never unsubscribe from them. Is there an easy way to do so? or similar?
        fun throttledClick(
            targetView: View,
            completionMethod: () -> Unit,
            throttleWindow: Long = DOUBLE_CLICK_THROTTLE_WINDOW
        ) {
                .throttleFirst(throttleWindow, MILLISECONDS)
                .subscribe {
    Juan Cuzmar
    dude this chat is dead
    The exception thrown by Flowable.error was not passed to onError and Firebase reported the crash. This question is posted here, what is the cause of this problem?

    I want to read 3 crore csv rows which is of 2GB csv file size and need to insert into MySQL via Java.

    Could someone please help me know the fastest and memory efficient way to avoid out of memory exception as well load in lesser time?

    Please kindly advise.

    Scorpio The Dark
    good day i have some flow like this.disposableSubscriptionOnUpstream = Flowable.fromPublisher(upstream) .onBackpressureBuffer( 1000 , () -> { messagesDropped.incrementAndGet(); log.error("буфер для канала телеметрии {} переполнен" , channelId); } , BackpressureOverflowStrategy.DROP_OLDEST) .observeOn(this.scheduler) .subscribeOn(this.scheduler) .subscribe(this::onNext, this::onError,this::onComplete,this::onSubscribe);
    and recive this error Could not deliver value due to lack of requests
    when this::onNext some how takes too long time , can you tell me why this error happans, even when .onBackpressureBuffer is set
    David Karnok
    I don't see how this could happen. What is the exact stacktrace and what is upstream?
    1 reply
    Scorpio The Dark
    upstream is BehaviorProcessor. this is some flow from mqtt subscription. i have about 1K such "channals" and some of them (about ten or may be twenty) have very intense input rate , something about 4 or 6 times rep second.
    David Karnok
    The stacktrace indicates it fails to emit in a BehaviorProcessor. onBackpressureBuffer requests the MAX amount thus it should not cause this. Is upstream exactly a BehaviorProcessor or do you have some other operators attached to it before the fromPublisher(upstream)?
    Can someone help me with latest working microservices demo or sample application to deploy in AWS and test? Please kindly help me..Need to learn it quickly as possible..
    Scott Reynolds
    I have been working on a Calcite Adapter to JavaRx and I have made a mistake that I cannot reason about. The generated code (which is below) doesn't run until the root Flowable is disposed . I feel like it is related to the groupBy. any tips on on what might be wrong? Tips on how to debug ? Common areas people create Deadlock with Flowable?
    /* 100 */   final io.reactivex.rxjava3.core.Flowable _inputFlowable = new com.twilio.raas.reactive.KuduFlowable(                                                                          [109/1945]
    /* 101 */     (com.twilio.kudu.sql.CalciteKuduTable) root.getRootSchema().getSubSchema("kudu").getTable("ReportCenter.UsageReportTransactions"),
    /* 102 */     false,
    /* 103 */     v0stashed,
    /* 104 */     v2stashed,
    /* 105 */     v1stashed,
    /* 106 */     new org.apache.calcite.linq4j.function.Function1(){
    /* 107 */       public Object apply(final Object abstractRow) {
    /* 108 */         return new Object[] {
    /* 109 */ (            (org.apache.kudu.client.RowResult) abstractRow).getInt(0),
    /* 110 */ (            (org.apache.kudu.client.RowResult) abstractRow).getDecimal(1)};
    /* 111 */       }
    /* 112 */
    /* 113 */     },
    /* 114 */     new org.apache.calcite.linq4j.function.Predicate1(){
    /* 115 */       public boolean apply(final Object abstractRow) {
    /* 116 */         return true;
    /* 117 */       }
    /* 118 */
    /* 119 */     }).subscribeOn(io.reactivex.rxjava3.schedulers.Schedulers.io(), false).groupBy(com.twilio.calcite.rxjava.rel.RxAggregate.lixSelector(new org.apache.calcite.linq4j.function.Function1(
    ) {
    /* 120 */     public int apply(Object[] a0) {
    /* 121 */       return org.apache.calcite.runtime.SqlFunctions.toInt(a0[0]);
    /* 122 */     }
    /* 123 */     public Object apply(Object a0) {
    /* 124 */       return apply(
    /* 125 */         (Object[]) a0);
    /* 126 */     }
    /* 127 */   }
    /* 128 */   )).flatMap(com.twilio.calcite.rxjava.rel.RxAggregate.lixReduceGroup(com.twilio.calcite.rxjava.rel.RxAggregate.lixSeedSupplier(lambdaFactory.accumulatorInitializer()), com.twilio.calcit
    e.rxjava.rel.RxAggregate.lixReducer(lambdaFactory.accumulatorAdder()), lambdaFactory.resultSelector(new org.apache.calcite.linq4j.function.Function2() {
    /* 129 */     public Object[] apply(int key, Record2_0 acc) {
    /* 130 */       return new Object[] {
    /* 131 */           key,
    /* 132 */           acc.f0,
    /* 133 */           acc.f1};
    /* 134 */     }
    /* 135 */     public Object[] apply(Integer key, Record2_0 acc) {
    /* 136 */       return apply(
    /* 137 */         key.intValue(),
    /* 138 */         acc);
    /* 139 */     }
    /* 140 */     public Object[] apply(Object key, Object acc) {
    /* 141 */       return apply(
    /* 142 */         (Integer) key,
    /* 143 */         (Record2_0) acc);
    /* 144 */     }
    /* 145 */   }
    /* 146 */   )));
    /* 147 */   return _inputFlowable.map(com.twilio.calcite.rxjava.rel.RxCalc.calcMapFunction(lixMapper));
    4 replies

    Hello I'm trying to page a database using Flowable but in the last page it doesn't signal on Complete and the Flowable never ends

       public static <T> Flowable<T> flowable(CompletionStage<MappedAsyncPagingIterable<T>> stage) {
            return Single.fromCompletionStage(stage)
                    .flatMap(iterable -> {
                        final Flowable<T> fromIterable = Flowable.fromIterable(iterable.currentPage());
                        if (iterable.hasMorePages()) {
                            final Flowable<T> flowable = flowable(iterable.fetchNextPage());
                            return Flowable.concat(fromIterable, flowable);
                        } else {
                            return fromIterable;

    I tried when iterable.hasMorePages() == false to concat the fromIterable Flowable with a Flowable.empty(), but it didn't work, onComplete never get invoked

    The page only fails with one item, if it has more than 1 it completes succesfully
    Scott Reynolds
    This is similar to what I experienced in my Pasted code above. My generated code works if there is several items to return. It fails when there is one
    Grigore Cristian-Andrei
    Duy Tran
    how I can get item which droped when overflow in onBackpressureBuffer()
    Hello, I have some very general question: I am facing some UndeliverableExceptions in our application. Nevertheless I debugged and could reproduce the issue. So I have some question regarding correct implementation of .flatMap and if my assumption is right:
    If I have a method like return receivedSingle.flatMap( single -> fetchAnotherSingle(single.getId())).flatMap(single-> fetchAnotherAnotherSingle(UUID.randomUUI()));
    if the fetchAnotherSingle(single.getId()) ends or is cancelled before fetchAnotherAnotherSingle(UUID.randomUUI()) is returned, can this lead to such Undeliverable(wrapped Timeout Exceptions) ?
    btw. we are using RxJava2
    The other option is that an error from a Single.zip() is not properly handled
    Bruce Zhang
    Oleh Dokuka
    @akarnokd do you remember if there is a significant overhead of keeping Fusion interfaces and QueueSubscription constant part of the chain
    I'm wondering about this need to always have this instanceof Fuseable and then cast to QueueSubscription along the way
    vs always keeping QueueSubscription as something expected from the upstream
    David Karnok
    @OlegDokuka I don't have measurements for this. I don't expect it to be significant. You could practically redo the RS interfaces (i.e., not even extending/implementing them) and relegating RS to a pure external interop feature.
    Oleh Dokuka
    @akarnokd I wonder if we make everything fuseable and then, depending on the condition one will fuse or just refuse to do so. The benefit is the reduction of duplicate classes within the codebase. However, the negative aspect could be possible (although I expect it to be minor) performance degradation
    2 replies