These are chat archives for ReactiveX/RxJava
RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
upstream.cancel()once inner subscriber is done. What's the reason for that ?
Flowable.just(Boolean.TRUE) .repeat() .concatMap(token -> Flowable.just("blah").doOnCancel(() -> System.out.println("Cancelled"))) .subscribe();
Flowable.flatMap()does not have this issue(?).
dispose()impl ...But then you are subject to operators behaviors (like concatMap() which cancel()/dispose() upstream once completed)
reactor-nettyimpl do close the socket on dispose: https://github.com/reactor/reactor-netty/blob/56041f7492768287b77c90d9d140c26ab6f6f847/src/main/java/reactor/netty/DisposableChannel.java#L73
flatMap(foo, 1)but it's probably less efficient ...)
cancel()upstream unless they're propagating a downstream
cancel()invocation OR if it's the expected behavior of the operator to limit the upstream (i.e:
concatMap()exposed by my snippet should be considered as a bug.
flatMap(foo, 1)hack seems to have some serious performance impact :-(
DropCanceloperator and kept the
concatMap(). It seems to have less performance impact.
If I emits concurrently by call
Observable.create(delegate) .buffer(1, TimeUnit.SECONDS, MessageProducer.LMT) .filter(CollectionUtil::isNotEmpty)
delegate.onNext， will it lost messages?