RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
this.disposableSubscriptionOnUpstream = Flowable.fromPublisher(upstream)
.onBackpressureBuffer(
1000
, () -> {
messagesDropped.incrementAndGet();
log.error("буфер для канала телеметрии {} переполнен"
, channelId);
}
, BackpressureOverflowStrategy.DROP_OLDEST)
.observeOn(this.scheduler)
.subscribeOn(this.scheduler)
.subscribe(this::onNext, this::onError,this::onComplete,this::onSubscribe);
Could not deliver value due to lack of requests
Flowable
is disposed . I feel like it is related to the groupBy
. any tips on on what might be wrong? Tips on how to debug ? Common areas people create Deadlock with Flowable
?/* 100 */ final io.reactivex.rxjava3.core.Flowable _inputFlowable = new com.twilio.raas.reactive.KuduFlowable( [109/1945]
/* 101 */ (com.twilio.kudu.sql.CalciteKuduTable) root.getRootSchema().getSubSchema("kudu").getTable("ReportCenter.UsageReportTransactions"),
/* 102 */ false,
/* 103 */ v0stashed,
/* 104 */ v2stashed,
/* 105 */ v1stashed,
/* 106 */ new org.apache.calcite.linq4j.function.Function1(){
/* 107 */ public Object apply(final Object abstractRow) {
/* 108 */ return new Object[] {
/* 109 */ ( (org.apache.kudu.client.RowResult) abstractRow).getInt(0),
/* 110 */ ( (org.apache.kudu.client.RowResult) abstractRow).getDecimal(1)};
/* 111 */ }
/* 112 */
/* 113 */ },
/* 114 */ new org.apache.calcite.linq4j.function.Predicate1(){
/* 115 */ public boolean apply(final Object abstractRow) {
/* 116 */ return true;
/* 117 */ }
/* 118 */
/* 119 */ }).subscribeOn(io.reactivex.rxjava3.schedulers.Schedulers.io(), false).groupBy(com.twilio.calcite.rxjava.rel.RxAggregate.lixSelector(new org.apache.calcite.linq4j.function.Function1(
) {
/* 120 */ public int apply(Object[] a0) {
/* 121 */ return org.apache.calcite.runtime.SqlFunctions.toInt(a0[0]);
/* 122 */ }
/* 123 */ public Object apply(Object a0) {
/* 124 */ return apply(
/* 125 */ (Object[]) a0);
/* 126 */ }
/* 127 */ }
/* 128 */ )).flatMap(com.twilio.calcite.rxjava.rel.RxAggregate.lixReduceGroup(com.twilio.calcite.rxjava.rel.RxAggregate.lixSeedSupplier(lambdaFactory.accumulatorInitializer()), com.twilio.calcit
e.rxjava.rel.RxAggregate.lixReducer(lambdaFactory.accumulatorAdder()), lambdaFactory.resultSelector(new org.apache.calcite.linq4j.function.Function2() {
/* 129 */ public Object[] apply(int key, Record2_0 acc) {
/* 130 */ return new Object[] {
/* 131 */ key,
/* 132 */ acc.f0,
/* 133 */ acc.f1};
/* 134 */ }
/* 135 */ public Object[] apply(Integer key, Record2_0 acc) {
/* 136 */ return apply(
/* 137 */ key.intValue(),
/* 138 */ acc);
/* 139 */ }
/* 140 */ public Object[] apply(Object key, Object acc) {
/* 141 */ return apply(
/* 142 */ (Integer) key,
/* 143 */ (Record2_0) acc);
/* 144 */ }
/* 145 */ }
/* 146 */ )));
/* 147 */ return _inputFlowable.map(com.twilio.calcite.rxjava.rel.RxCalc.calcMapFunction(lixMapper));
Hello I'm trying to page a database using Flowable but in the last page it doesn't signal on Complete and the Flowable never ends
public static <T> Flowable<T> flowable(CompletionStage<MappedAsyncPagingIterable<T>> stage) {
return Single.fromCompletionStage(stage)
.toFlowable()
.flatMap(iterable -> {
final Flowable<T> fromIterable = Flowable.fromIterable(iterable.currentPage());
if (iterable.hasMorePages()) {
final Flowable<T> flowable = flowable(iterable.fetchNextPage());
return Flowable.concat(fromIterable, flowable);
} else {
return fromIterable;
}
});
}
I tried when iterable.hasMorePages() == false to concat the fromIterable Flowable with a Flowable.empty(), but it didn't work, onComplete never get invoked
this instanceof Fuseable
and then cast to QueueSubscription
along the way