RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Flowable.error
was not passed to onError
and Firebase reported the crash. This question is posted here, what is the cause of this problem?
this.disposableSubscriptionOnUpstream = Flowable.fromPublisher(upstream)
.onBackpressureBuffer(
1000
, () -> {
messagesDropped.incrementAndGet();
log.error("буфер для канала телеметрии {} переполнен"
, channelId);
}
, BackpressureOverflowStrategy.DROP_OLDEST)
.observeOn(this.scheduler)
.subscribeOn(this.scheduler)
.subscribe(this::onNext, this::onError,this::onComplete,this::onSubscribe);
Could not deliver value due to lack of requests
Flowable
is disposed . I feel like it is related to the groupBy
. any tips on on what might be wrong? Tips on how to debug ? Common areas people create Deadlock with Flowable
?/* 100 */ final io.reactivex.rxjava3.core.Flowable _inputFlowable = new com.twilio.raas.reactive.KuduFlowable( [109/1945]
/* 101 */ (com.twilio.kudu.sql.CalciteKuduTable) root.getRootSchema().getSubSchema("kudu").getTable("ReportCenter.UsageReportTransactions"),
/* 102 */ false,
/* 103 */ v0stashed,
/* 104 */ v2stashed,
/* 105 */ v1stashed,
/* 106 */ new org.apache.calcite.linq4j.function.Function1(){
/* 107 */ public Object apply(final Object abstractRow) {
/* 108 */ return new Object[] {
/* 109 */ ( (org.apache.kudu.client.RowResult) abstractRow).getInt(0),
/* 110 */ ( (org.apache.kudu.client.RowResult) abstractRow).getDecimal(1)};
/* 111 */ }
/* 112 */
/* 113 */ },
/* 114 */ new org.apache.calcite.linq4j.function.Predicate1(){
/* 115 */ public boolean apply(final Object abstractRow) {
/* 116 */ return true;
/* 117 */ }
/* 118 */
/* 119 */ }).subscribeOn(io.reactivex.rxjava3.schedulers.Schedulers.io(), false).groupBy(com.twilio.calcite.rxjava.rel.RxAggregate.lixSelector(new org.apache.calcite.linq4j.function.Function1(
) {
/* 120 */ public int apply(Object[] a0) {
/* 121 */ return org.apache.calcite.runtime.SqlFunctions.toInt(a0[0]);
/* 122 */ }
/* 123 */ public Object apply(Object a0) {
/* 124 */ return apply(
/* 125 */ (Object[]) a0);
/* 126 */ }
/* 127 */ }
/* 128 */ )).flatMap(com.twilio.calcite.rxjava.rel.RxAggregate.lixReduceGroup(com.twilio.calcite.rxjava.rel.RxAggregate.lixSeedSupplier(lambdaFactory.accumulatorInitializer()), com.twilio.calcit
e.rxjava.rel.RxAggregate.lixReducer(lambdaFactory.accumulatorAdder()), lambdaFactory.resultSelector(new org.apache.calcite.linq4j.function.Function2() {
/* 129 */ public Object[] apply(int key, Record2_0 acc) {
/* 130 */ return new Object[] {
/* 131 */ key,
/* 132 */ acc.f0,
/* 133 */ acc.f1};
/* 134 */ }
/* 135 */ public Object[] apply(Integer key, Record2_0 acc) {
/* 136 */ return apply(
/* 137 */ key.intValue(),
/* 138 */ acc);
/* 139 */ }
/* 140 */ public Object[] apply(Object key, Object acc) {
/* 141 */ return apply(
/* 142 */ (Integer) key,
/* 143 */ (Record2_0) acc);
/* 144 */ }
/* 145 */ }
/* 146 */ )));
/* 147 */ return _inputFlowable.map(com.twilio.calcite.rxjava.rel.RxCalc.calcMapFunction(lixMapper));
Hello I'm trying to page a database using Flowable but in the last page it doesn't signal on Complete and the Flowable never ends
public static <T> Flowable<T> flowable(CompletionStage<MappedAsyncPagingIterable<T>> stage) {
return Single.fromCompletionStage(stage)
.toFlowable()
.flatMap(iterable -> {
final Flowable<T> fromIterable = Flowable.fromIterable(iterable.currentPage());
if (iterable.hasMorePages()) {
final Flowable<T> flowable = flowable(iterable.fetchNextPage());
return Flowable.concat(fromIterable, flowable);
} else {
return fromIterable;
}
});
}
I tried when iterable.hasMorePages() == false to concat the fromIterable Flowable with a Flowable.empty(), but it didn't work, onComplete never get invoked
this instanceof Fuseable
and then cast to QueueSubscription
along the way