All generic questions around Reactor. For advanced questions you can also try #reactor-core and #reactor-netty
Does anybody know if passing Mono/Flux around in the project is right way of doing chaining? It seem like I'm losing performance on all these reactor wrappers. In profiler I see call-stack like this one https://snipboard.io/fXT6KW.jpg
I see calls as deep as 150 nested Reactor invocations.
flatMap
returns Mono<ResponseEntity>
and I can’t pass to map
original amp message.RabbitFlux.createReceiver().consumeManualAck(QUEUE_NAME, consumeOptions)
.flatMap(amqpMessage -> {
String url = new String(amqpMessage.getBody());
return webclient.get().uri(url).retrieve().toBodilessEntity();
})
.map(responseEntity -> {
if (responseEntity.getStatusCode().is2xxSuccessful()) {
// how to pass here amqpMessage?
amqpMessage.ack();
} else {
amqpMessage.nack();
}
return amqpMessage;
}).subscribe();
var sourceFlux= Flux.range(1,10);
var hotflux1=sourceFlux.publish().refCount(2);
var firstFlux=hotflux1.map(i->i*2).log("First");
//this is a io blocking call
var secondFlux=hotflux1
.publishOn(Schedulers.boundedElastic()).map(i->i*5).log("Second");
secondFlux.subscribe();
//
var firstMono=Flux.just(300).log("First Mono");
var concatedFlux=Flux.concat(firstFlux,firstMono);
concatedFlux.subscribe();
I am using an approach similar to this to lift a reactor context into ThreadLocal for use by a grpc client interceptor.
https://stackoverflow.com/questions/47501349/using-hooks-and-a-lift-to-push-context-into-threadlocal
I am getting some stale values due to some bugs in the way I am using it. My main concern with this approach is its dependence on the onNext call to lift a fresh context into the ThreadLocal. If used incorrectly - namely not doing a contextWrite on a stream, it can lead to leaked values from other threads.
One way I thought of to add more safety would be to clear the ThreadLocal in the onComplete or onError of the subscriber. From my initial inspection it appears this works because, so far, the onComplete and onError is invoked on the same thread as onNext.
Question: Is it guaranteed that onComplete and onError is called on the same thread as onNext for a given subscriber like this?
public static Flux<ByteBuffer> chunker(Flux<ByteBuffer> data, int chunkSize) {
return Flux.defer(() -> {
AtomicLong countDown = new AtomicLong(chunkSize);
return data.bufferUntil(buf -> {
if (countDown.addAndGet(-buf.remaining()) <= 0L) {
countDown.set(chunkSize);
return true;
}
return false;
}).concatMap(bufList -> {
int size = bufList.stream().mapToInt(ByteBuffer::remaining).sum();
return Mono.just(bufList.stream().reduce(ByteBuffer.allocate(size), ByteBuffer::put));
});
});
}
is there a straightforward way to achieve a graceful flush/shutdown with an infinite source? I have a Flux from a KafkaReceiver, and at shutdown, I want to stop receiving messages from upstream, but allow all of the in-flight work to complete, and then allow the KafkaReceiver to commit its offsets (via the provided ReceiverOffset.acknowledge
callback), before the KafkaReceiver closes its underlying kafka consumer. dispose
seems to propagate upstream immediately, canceling all buffered work... and it also induces the KafkaConsumer to be closed before the offsets associated with that work can be committed.
I think this might naturally take the form of arranging for the request
signals flowing upstream to the KafkaReceiver to be disabled at shutdown time, but I haven't found a simple way to inject that behavior into the wiring. I'm using a pretty hacky solution that involves publish
to a ConnectableFlux, with a bogus consumer that stops requesting ... but this seems pretty heavyweight and unnatural. Any suggestions?
Hi everyone, I have been using Mono.zip a lot in my application and since I am from nodejs background, I was thinking it works like promise.all but I found recently In mono.zip if one of the mono returns empty it won't run
replication
fun main() {
val m1 = Mono.just("A")
val m2 = Mono.just("B")
val m3 = Mono.empty<String>()
Mono.zip(m1,m2,m3).map {
println("Data t1: ${it.t1} t2: ${it.t2} t3: ${it.t3}")
}.subscribe()
}
It dont prints anything
workaround
fun main() {
val m1 = Mono.just("A").map { Optional.of(it) }.defaultIfEmpty(Optional.empty())
val m2 = Mono.just("B").map { Optional.of(it) }.defaultIfEmpty(Optional.empty())
val m3 = Mono.empty<String>().map { Optional.of(it) }.defaultIfEmpty(Optional.empty())
Mono.zip(m1,m2,m3).map {
println("Data t1: ${it.t1.get()} t2: ${it.t2.get()} t3: ${it.t3.orElse(null)}")
}.subscribe()
}
My requirement is I have a couple of mono and I need output of all of them even one is empty. In case any of the mono fails then it can throw an error.
@OlegDokuka. any view regarding this buddy
The term “stream” [...] is a generic term that applies to Java Streams, Observables and even Iterators.
.module
file. How come?
ExchangeFilterFunction
in WebClient
. I declared a filter that runs backoff retry on error, but for WebClientResponseException
type it doesn't work, retry mechanism not run. Unlike WebClientRequestException
for which is working. What I do wrong? //creating client
var client = WebClient.builder()
.defaultHeader(CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.baseUrl(serviceUrl)
.filter(decorateFunction)
.build();
(...)
//example of using
webClient.delete().uri(deleteUrl).retrieve()
.bodyToMono(XXX.class)
.doOnSuccess(consumeSuccess())
.doOnError(consumeError())
.subscribe();
(...)
//`decorateFunction` is a like below
public interface ReactiveRequestDecorator extends ExchangeFilterFunction {
/**
* Decorate the given request.
*
* @param request the request to decorate
* @param <T> the type of result returned
* @return decorated request
*/
<T> Mono<T> decorate(Mono<T> request);
@Override
default Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
return decorate(next.exchange(request));
}
}
// example implementation
(...)
@Override
public <T> Mono<T> decorate(Mono<T> request) {
return Mono.defer(() -> request)
.retryWhen(Retry.backoff(3, Duration.ofMillis(200));
}
(...)
Hi all. Is there any difference regarding memory usage when using share() in the following code?
final Path file = [...];
final Flux<DataBuffer> buffer = this.webClient.get().uri(url).retrieve().bodyToFlux(DataBuffer.class).retry(3);
// Option 1 without calling share()
DataBufferUtils.write(buffer, file, StandardOpenOption.CREATE).block();
// Option 2 with calling share()
DataBufferUtils.write(buffer, file, StandardOpenOption.CREATE).share().block();
My application has to download a lot of files and I want to use as less RAM as possible.
hi all, Anyone has used Flow, Coroutine, and Webflux together in production. I want to know how is the experience. I have a spring weblfux application, I want to convert it to flow and webflux. The benefit would be imperative looking at asynchronous code
https://spring.io/blog/2019/04/12/going-reactive-with-spring-coroutines-and-kotlin-flow
Hello all, i need to investigate the subscription cancellation cause on a webflux application. How can i log/determine the cause of a cancel event. Following code is called from the RestController:
Mono.fromCallable(() -> {
// do some blocking stuff
log.info("creating some value");
return someValue;
})
.doOnNext(p -> log.info("someValue created: {}", p))
.doOnError(throwable -> log.error("Something went wrong on someValue creation", throwable))
.doOnCancel(() -> log.error("cancelled on something ..."))
and produces following log:
"message":"| onSubscribe([Fuseable] FluxOnAssembly.OnAssemblyConditionalSubscriber)","logger_name":"reactor.Mono.OnAssembly.224","thread_name":"nioEventLoopGroup-3-2"
"message":"| request(unbounded)","logger_name":"reactor.Mono.OnAssembly.224","thread_name":"nioEventLoopGroup-3-2"
"message":"onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)","logger_name":"reactor.Mono.CallableOnAssembly.223","thread_name":"nioEventLoopGroup-3-2"
"message":"request(unbounded)","logger_name":"reactor.Mono.CallableOnAssembly.223","thread_name":"nioEventLoopGroup-3-2"
"message":"| cancel()","logger_name":"reactor.Mono.OnAssembly.224","thread_name":"reactor-http-epoll-4",
"message":"cancel()","logger_name":"reactor.Mono.CallableOnAssembly.223","thread_name":"reactor-http-epoll-4",
"message":"cancelled on something ...","logger_name":"domain.Service","thread_name":"reactor-http-epoll-4"
"message":"creating some value","logger_name":"domain.Service2", "thread_name":"nioEventLoopGroup-3-2"
any help would be appreciated! Regards, Gena
Hello. I was reading the code for EmitterProcessor and wanted to ask a question.
Why do we use loop for synchronization in add / remove methods?
If 'SUBSCRIBERS' is saved in Map structure, it seems that 'System.arraycopy' is not called every time in the add method. And in the remove method, it seems that it is not necessary to find the element while traversing the array.
Maybe there is a reason I didn't think of. If you let me know, I'd appreciate it.
Hello,
I want to build a system where multiple subscribers can subscribe to an individual events, something like different queues in message broker, and a publisher that just sends all messages to a single pile.
What I am trying to understand is how to organize this routing between subscribers so each consumes only messages addressed to this concrete consumer.
If that may help, I am using reactive spring.
Hello,
I am using a unicast sink like this. Incase of NON_SERIALIZED emits, emitNext() method is blocking till the consumeItem gets Completed. Why is this happening?
private final Sinks.Many<String> inputSink = Sinks.many().unicast().onBackpressureBuffer(queue);
//On publisher TheadPool A
public Mono<Void> produceItem(String item) {
log.info("Emit Next");
//emitNext is blocking on NON_SERIALIZED emits. why??
inputSink.emitNext(item, getEmitFailureHandler());
log.info("Emit Done");
return Mono.empty();
}
//subscriber Thread Pool B
public Flux<String> consumeItem() {
return inputSink.asFlux()
.publishOn(Schedulers.boundedElastic())
.flatMap(...);
}
I am trying to stress test a client-server system by submitting 1000 messages from the client, each as a Mono, but doing 10 simultaneously. My code is:
Flux.range(0, 1000) //send a total of 1000 messages
.flatMap(i -> sendMessage())
.limitRate(10) //10 at a time
.collectList()
.block();
But it isn't working. Instead, the client initially submits 256 simultaneously.
The FluxFlatMap.FlatMapMain.onSubscribe(Subscription s) method uses:
s.request(Operators.unboundedOrPrefetch(maxConcurrency));
And maxConcurrency is 256.
It doesn't matter what values I use in limitRate, e.g., limitRate(2,1)
This is caused by the flatMap method, which I call with default concurrency and prefetch:
public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper) {
return flatMap(mapper, Queues.SMALL_BUFFER_SIZE, Queues
.XS_BUFFER_SIZE);
}
I tried moving the limitRate() method in front of flatMap but get the same behavior.
So limitRate doesn't do anything, and the code needs to be:
Flux.range(0, 1000)
.flatMap(i -> sendMessage(), 10, 10)
//.limitRate(10) has no effect
.collectList()
.block();
Anyone have an explanation?
Folks, I just noticed that following code in my main data stream causes significant performance degradation (~2X)
.timeout(inactivityTimeout, Flux.defer {
logger.trace("Stopping job after $inactivityTimeout of inactivity.")
Flux.empty()
})
Async-profiler shows that significant amount of time is spent on java/util/concurrent/locks/LockSupport.unpark
method. What am I missing here?
reactor.netty.http.client.PrematureCloseException: Connection prematurely closed BEFORE response
wiretap: true
does not show anything suspicious.httpClient.secure
for configuring mutual authentication.I recently switched my app over from webmvc to webflux, and tried to configure this
@Configuration
@EnableWebFlux
public class WebConfig implements WebFluxConfigurer {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**")
.allowedOrigins("*");
}
@Bean
CorsWebFilter corsWebFilter() {
var corsConfig = new CorsConfiguration();
corsConfig.setAllowedOrigins(List.of("*"));
var source = new UrlBasedCorsConfigurationSource();
source.registerCorsConfiguration("/**", corsConfig);
return new CorsWebFilter(source);
}
}
and also tried adding @CrossOrigin
to my controller.
But I'm still not getting the cross-origin response headers.
Anyone know the fix?
Hi :wave:. I'm an engineer with Couchbase, and I'm struggling with adding full thread-safety to our transactions library that is based around reactor. For some context, here is a sample transaction:
transactions.reactive().run((ctx) -> {
// This chain is specified by the user, saying what they want to happen in the transaction.
return Flux.fromIterable(ids)
.parallel()
.runOn(Schedulers.elastic())
// User is doing concurrent INSERTs transactionally
.flatMap(id -> ctx.query("INSERT..."))
.then();
}).subscribe();
The ctx.query() internally needs to take a mutex lock for part of the work. (I’ve written a reactive-compatible mutex that keeps a list of lock-waiters - Sinks).
The problem is if one of those concurrent query operations fails. I understand that a stream can only raise one error. How this seems to be implemented is that if an error is raised past the concatMap, then all the remaining concurrent query operations simply stop at the end of their current operator - no error or complete signal is raised on them. This understanding is based on trial-and-error - am I correct on this?
This is causing me some real issues as I need those concurrent query ops to do some tidyup before dying - namely they have to remove themselves from the mutex (e.g. unlocking it if they have the lock, removing themselves from the list of waiters if they don’t). Otherwise subsequent rollback, which also needs the lock, deadlocks.
On non-query operations I use a workaround of waiting for the other ops to complete, before propagating the error (and also reject any new operations). But this doesn’t work for query, as some query errors are potentially not fatal to the transaction, and need to be raised out of the ctx.query() to give the user the chance to ignore it.
I hope that makes sense, let me know if there’s anything I need to clarify.
So - I’m not sure exactly what my request is. If I had a way to do some work just before an operation dies, that would work for me. Or perhaps there is an alternative way of doing things that I’m missing here - I feel this must be a relatively common situation, e.g. concurrent ops needing to do cleanup if one of them fails. Please let me know if you have any ideas.
limitRate
. We see far more requests than expected. Right now our service is dying with out of memory, because the repository is too slow. What do we miss here? Any Advice?clientA.retrieveFoos()
.flatMap { foo -> clientB.retrieveBars(foo) }
.flatMap { someSlowRepository.save(it) }
.limitRate(10)
Issue with spring webclient subscribe(...)
I have setup spring cloud stream application. Here is my consumer spring function
@Bean
public Consumer<Model> rssConsumer(final Handler handler) {
return model -> {
if (model != null) {
LOGGER.info("Received url: '{}' ", model.getUrl());
handler.process(model.getUrl());
}
};
}
I see i have recieved 752 url and it reaches to webClient call but when i see subscribe block i am receving 500 response. I am not understand why i am not getting all url response even i recived 752 message.
Here is my webClient code as well as webClient configuration
public void execute(final String url) {
webClient.get().uri(url)
.exchangeToMono(clientResponse -> clientResponse.bodyToMono(String.class)
.flatMap(responseBody -> Mono
.just(new Model(responseBody, clientResponse.statusCode().value()))))
.onErrorResume(exception -> Mono
.just(new Model(exception.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR.value())))
.subscribe(Model -> {
// save data in file
});
}
@Bean
public HttpClient httpClient() {
return HttpClient.create().option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000)
.responseTimeout(Duration.ofMillis(30000))
.doOnConnected(conn -> conn.addHandlerLast(new ReadTimeoutHandler(30000, TimeUnit.MILLISECONDS))
.addHandlerLast(new WriteTimeoutHandler(30000, TimeUnit.MILLISECONDS)));
}
@Bean
public WebClient webClient(HttpClient httpClient) {
return WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient)).build();
}
Schedulers.onHandleError
changed between reactor-core
versions 3.3.5
and 3.4.12
? We were on Spring Boot 2.3.x (and reactor 3.3.5) before and had a Spring Integration Test that threw an error from one of the dependencies used in the reactor chain. The test expected the Schedulers.onHandleError
method to handle the error but after upgrading to Spring 2.6.1 (and reactor 3.4.12), the test has started failing. Upon debugging, I see that Hooks.onErrorDropped
is receiving the error. Wanted to know your thoughts on this. Thanks!
Hi folks. I noticed interesting thing today. Consider following example:
val source = Flux.concat(
Mono.fromCallable { "one" },
Mono.fromCallable { "two" },
Mono.fromCallable { "three" },
)
source
.timeout(Duration.ofSeconds(5), Flux.defer {
logger.trace("Stopping job after 5 seconds of inactivity.")
Flux.empty()
})
.subscribe()
Turned out Mono.fromCallable { "three" }
may never get to materialize if source
was cancelled by timeout() downstream. Can I do something about it? My goal is to have finalizing Flux() at the end of the source that must emit some event.