Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
John Meehan
That's the path I'm headed down. Coming from Akka streams and Scala, keeping external state seemed like the wrong approach. But I see from other examples that's not out of the norm here. Something like?
    public static Flux<ByteBuffer> chunker(Flux<ByteBuffer> data, int chunkSize) {
        return Flux.defer(() -> {
            AtomicLong countDown = new AtomicLong(chunkSize);
            return data.bufferUntil(buf -> {
                if (countDown.addAndGet(-buf.remaining()) <= 0L) {
                    return true;
                return false;
            }).concatMap(bufList -> {
                int size = bufList.stream().mapToInt(ByteBuffer::remaining).sum();
                return Mono.just(bufList.stream().reduce(ByteBuffer.allocate(size), ByteBuffer::put));
4 replies
Hi, how to implemente throttling in reactor?
Daniel Wahlqvist
Can I do a nested groupBy? I have equipments and for each equipment I have different metrics. So I would like to do a groupBy for each equipment and then an inner groupBy for each type of metric. That might be the wrong approach of course but I'm mostly trying to get started and learn
2 replies
josh gruenberg

is there a straightforward way to achieve a graceful flush/shutdown with an infinite source? I have a Flux from a KafkaReceiver, and at shutdown, I want to stop receiving messages from upstream, but allow all of the in-flight work to complete, and then allow the KafkaReceiver to commit its offsets (via the provided ReceiverOffset.acknowledge callback), before the KafkaReceiver closes its underlying kafka consumer. dispose seems to propagate upstream immediately, canceling all buffered work... and it also induces the KafkaConsumer to be closed before the offsets associated with that work can be committed.

I think this might naturally take the form of arranging for the request signals flowing upstream to the KafkaReceiver to be disabled at shutdown time, but I haven't found a simple way to inject that behavior into the wiring. I'm using a pretty hacky solution that involves publish to a ConnectableFlux, with a bogus consumer that stops requesting ... but this seems pretty heavyweight and unnatural. Any suggestions?

3 replies

Hi everyone, I have been using Mono.zip a lot in my application and since I am from nodejs background, I was thinking it works like promise.all but I found recently In mono.zip if one of the mono returns empty it won't run


fun main() {
    val m1 = Mono.just("A")
    val m2 = Mono.just("B")
    val m3 = Mono.empty<String>()

    Mono.zip(m1,m2,m3).map {
        println("Data t1: ${it.t1} t2: ${it.t2} t3: ${it.t3}")

It dont prints anything


fun main() {
    val m1 = Mono.just("A").map { Optional.of(it) }.defaultIfEmpty(Optional.empty())
    val m2 = Mono.just("B").map { Optional.of(it) }.defaultIfEmpty(Optional.empty())
    val m3 = Mono.empty<String>().map { Optional.of(it) }.defaultIfEmpty(Optional.empty())

    Mono.zip(m1,m2,m3).map {
        println("Data t1: ${it.t1.get()} t2: ${it.t2.get()} t3: ${it.t3.orElse(null)}")

My requirement is I have a couple of mono and I need output of all of them even one is empty. In case any of the mono fails then it can throw an error.

@OlegDokuka. any view regarding this buddy

1 reply
Salathiel Genèse
I think you're doing it right, Zakir. Since null value will result in an empty mono
3 replies
Daniel Wahlqvist
I have a flux and I want to dynamically tell it how to transform the data. My idea is to join the flux with another "command flux" and adapt the processing depending on the last command. Is this idea similar to a real and tested pattern someone else have used? I suspect you guys can give me some hints. What I need to do is to figure out how to implement this and I need to validate if it's a good pattern at all.
3 replies
Daniel Wahlqvist
I've been reading the API documentation and feel a little overwhelmed but also impressed and curious. Now I'm wondering; what is Reactor? Sure, I get it's a reactive programming framework but what I haven't anticipated is what such a framework must evolve into, if it's successful. I didn't realize this before but now I believe a successful reactive programming framework MUST in the long run evolve into a successful stream processing framework. That might be a strange claim, and I know that's not what many people believe. E.g. this attitude is common: https://www.tikalk.com/posts/2017/12/06/streaming-vs-reactive/ Given all this; what is Reactor and what is it evolving into?
Salathiel Genèse
That article just sparks confusion
The term “stream” [...] is a generic term that applies to Java Streams, Observables and even Iterators.
1 reply
Kirill Marchuk
Hi all. I am a bit confused with Gradle Metadata. In the .pom artifact from here: https://repo1.maven.org/maven2/io/projectreactor/reactor-core/3.3.5.RELEASE/, it says "published-with-gradle-metadata". but the folder for this release (same URL) does not contain any .module file. How come?
Hi! I have a problem with using ExchangeFilterFunction in WebClient. I declared a filter that runs backoff retry on error, but for WebClientResponseExceptiontype it doesn't work, retry mechanism not run. Unlike WebClientRequestException for which is working. What I do wrong?
//creating client 
var client = WebClient.builder()
               .defaultHeader(CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
//example of using
//`decorateFunction` is a like below 
public interface ReactiveRequestDecorator extends ExchangeFilterFunction {

     * Decorate the given request.
     * @param request the request to decorate
     * @param <T> the type of result returned
     * @return decorated request
    <T> Mono<T> decorate(Mono<T> request);

    default Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
        return decorate(next.exchange(request));


// example implementation
public <T> Mono<T> decorate(Mono<T> request) {
        return Mono.defer(() -> request)
                .retryWhen(Retry.backoff(3, Duration.ofMillis(200));
1 reply
Patrick Gotthard

Hi all. Is there any difference regarding memory usage when using share() in the following code?

final Path file = [...];
final Flux<DataBuffer> buffer = this.webClient.get().uri(url).retrieve().bodyToFlux(DataBuffer.class).retry(3);

// Option 1 without calling share()
DataBufferUtils.write(buffer, file, StandardOpenOption.CREATE).block();

// Option 2 with calling share()
DataBufferUtils.write(buffer, file, StandardOpenOption.CREATE).share().block();

My application has to download a lot of files and I want to use as less RAM as possible.


hi all, Anyone has used Flow, Coroutine, and Webflux together in production. I want to know how is the experience. I have a spring weblfux application, I want to convert it to flow and webflux. The benefit would be imperative looking at asynchronous code


4 replies
Ghenadii Batalski

Hello all, i need to investigate the subscription cancellation cause on a webflux application. How can i log/determine the cause of a cancel event. Following code is called from the RestController:

 Mono.fromCallable(() -> {
            // do some blocking stuff
            log.info("creating some value");
            return someValue;
    .doOnNext(p -> log.info("someValue created: {}", p))
        .doOnError(throwable -> log.error("Something went wrong on someValue creation", throwable))
        .doOnCancel(() -> log.error("cancelled on something ..."))

and produces following log:

"message":"| onSubscribe([Fuseable] FluxOnAssembly.OnAssemblyConditionalSubscriber)","logger_name":"reactor.Mono.OnAssembly.224","thread_name":"nioEventLoopGroup-3-2"
"message":"| request(unbounded)","logger_name":"reactor.Mono.OnAssembly.224","thread_name":"nioEventLoopGroup-3-2"
"message":"onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)","logger_name":"reactor.Mono.CallableOnAssembly.223","thread_name":"nioEventLoopGroup-3-2"        
"message":"| cancel()","logger_name":"reactor.Mono.OnAssembly.224","thread_name":"reactor-http-epoll-4",
"message":"cancelled on something ...","logger_name":"domain.Service","thread_name":"reactor-http-epoll-4"
"message":"creating some value","logger_name":"domain.Service2", "thread_name":"nioEventLoopGroup-3-2"

any help would be appreciated! Regards, Gena

8 replies
Hi , I had a doubt regarding how Spring WebClient works in a typical spring MVC application. I learnt that spring cloud gateway cannot run on mvc as it relies on spring webflux and that we should not mix runtime models like netty and tomcat . So my question was how can webclient run efficiently in spring mvc application ?
12 replies

Hello. I was reading the code for EmitterProcessor and wanted to ask a question.

Why do we use loop for synchronization in add / remove methods?

If 'SUBSCRIBERS' is saved in Map structure, it seems that 'System.arraycopy' is not called every time in the add method. And in the remove method, it seems that it is not necessary to find the element while traversing the array.

Maybe there is a reason I didn't think of. If you let me know, I'd appreciate it.

4 replies
Alexey Anufriev


I want to build a system where multiple subscribers can subscribe to an individual events, something like different queues in message broker, and a publisher that just sends all messages to a single pile.

What I am trying to understand is how to organize this routing between subscribers so each consumes only messages addressed to this concrete consumer.

If that may help, I am using reactive spring.



I am using a unicast sink like this. Incase of NON_SERIALIZED emits, emitNext() method is blocking till the consumeItem gets Completed. Why is this happening?

private final Sinks.Many<String> inputSink = Sinks.many().unicast().onBackpressureBuffer(queue);

//On publisher TheadPool A
public Mono<Void> produceItem(String item) {
        log.info("Emit Next");
       //emitNext is blocking on NON_SERIALIZED emits. why??
        inputSink.emitNext(item, getEmitFailureHandler());
        log.info("Emit Done");
        return Mono.empty();

//subscriber Thread Pool B
 public Flux<String> consumeItem() {
        return inputSink.asFlux()
1 reply

I am trying to stress test a client-server system by submitting 1000 messages from the client, each as a Mono, but doing 10 simultaneously. My code is:

Flux.range(0, 1000)  //send a total of 1000 messages
    .flatMap(i -> sendMessage())
    .limitRate(10)   //10 at a time

But it isn't working. Instead, the client initially submits 256 simultaneously.

The FluxFlatMap.FlatMapMain.onSubscribe(Subscription s) method uses:


And maxConcurrency is 256.

It doesn't matter what values I use in limitRate, e.g., limitRate(2,1)

This is caused by the flatMap method, which I call with default concurrency and prefetch:

public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper) {
    return flatMap(mapper, Queues.SMALL_BUFFER_SIZE, Queues

I tried moving the limitRate() method in front of flatMap but get the same behavior.

So limitRate doesn't do anything, and the code needs to be:

Flux.range(0, 1000)
    .flatMap(i -> sendMessage(), 10, 10)
    //.limitRate(10) has no effect

Anyone have an explanation?


Folks, I just noticed that following code in my main data stream causes significant performance degradation (~2X)

.timeout(inactivityTimeout, Flux.defer {
    logger.trace("Stopping job after $inactivityTimeout of inactivity.")

Async-profiler shows that significant amount of time is spent on java/util/concurrent/locks/LockSupport.unpark method. What am I missing here?

14 replies
Mantas Matūzas
Hello, we started to get WebClient exception reactor.netty.http.client.PrematureCloseException: Connection prematurely closed BEFORE response
Using wiretap: true does not show anything suspicious.
Error is thrown randomly. We are using httpClient.secure for configuring mutual authentication.
Is there anything that can be done?
2 replies

I recently switched my app over from webmvc to webflux, and tried to configure this

public class WebConfig implements WebFluxConfigurer {

    public void addCorsMappings(CorsRegistry registry) {

    CorsWebFilter corsWebFilter() {
        var corsConfig = new CorsConfiguration();

        var source = new UrlBasedCorsConfigurationSource();
        source.registerCorsConfiguration("/**", corsConfig);

        return new CorsWebFilter(source);

and also tried adding @CrossOrigin to my controller.

But I'm still not getting the cross-origin response headers.

Anyone know the fix?

2 replies
Zane XiaoYe
is there a way to pause Flux.generate() if backpressure is triggered?
Graham Pople

Hi :wave:. I'm an engineer with Couchbase, and I'm struggling with adding full thread-safety to our transactions library that is based around reactor. For some context, here is a sample transaction:

transactions.reactive().run((ctx) -> {

    // This chain is specified by the user, saying what they want to happen in the transaction.
    return Flux.fromIterable(ids)

            // User is doing concurrent INSERTs transactionally
            .flatMap(id -> ctx.query("INSERT..."))

The ctx.query() internally needs to take a mutex lock for part of the work. (I’ve written a reactive-compatible mutex that keeps a list of lock-waiters - Sinks).

The problem is if one of those concurrent query operations fails. I understand that a stream can only raise one error. How this seems to be implemented is that if an error is raised past the concatMap, then all the remaining concurrent query operations simply stop at the end of their current operator - no error or complete signal is raised on them. This understanding is based on trial-and-error - am I correct on this?

This is causing me some real issues as I need those concurrent query ops to do some tidyup before dying - namely they have to remove themselves from the mutex (e.g. unlocking it if they have the lock, removing themselves from the list of waiters if they don’t). Otherwise subsequent rollback, which also needs the lock, deadlocks.

On non-query operations I use a workaround of waiting for the other ops to complete, before propagating the error (and also reject any new operations). But this doesn’t work for query, as some query errors are potentially not fatal to the transaction, and need to be raised out of the ctx.query() to give the user the chance to ignore it.

I hope that makes sense, let me know if there’s anything I need to clarify.

So - I’m not sure exactly what my request is. If I had a way to do some work just before an operation dies, that would work for me. Or perhaps there is an alternative way of doing things that I’m missing here - I feel this must be a relatively common situation, e.g. concurrent ops needing to do cleanup if one of them fails. Please let me know if you have any ideas.

Graham Pople
Nb I've since discovered doOnCancel and ErrorMode/delayErrors after putting a bunch of breakpoints in the reactive code. Exploring those now.
KrishnaRao Veeramachaneni
Hi Team, i have usecase to consume million kafka ( reactor-kafka )messages & call REST Api in parallel ( TPS :120 ) with multiple instances of java app.. I'm using same code as mentioned - https://stackoverflow.com/questions/54126263/invoking-non-blocking-operations-sequentially-while-consuming-from-a-flux-includ?rq=1 ..but it's call API Sequentially .. not parallelly..
not sure ..how to solve the puzzle.. Can someone suggest on the same..
thanks in advance
i tested kafka part with 1 million.. able to consume in 10 mins..
when i integrate the API part it;s taking 10 mins for 1000 messages
KrishnaRao Veeramachaneni
Any advice ?
Patrick Hahn
We have two clients (fast) where we retrieve some data and a repository (slow) where the results are stored. We tried to limit the requests based on repository performance with limitRate. We see far more requests than expected. Right now our service is dying with out of memory, because the repository is too slow. What do we miss here? Any Advice?
            .flatMap { foo -> clientB.retrieveBars(foo) }
            .flatMap { someSlowRepository.save(it) }
3 replies
Salman khandu
Does spring web-client support multiple redirects? and how many redirect it support?
2 replies
Salman khandu

Issue with spring webclient subscribe(...)

I have setup spring cloud stream application. Here is my consumer spring function

    public Consumer<Model> rssConsumer(final Handler handler) {
        return model -> {
            if (model != null) {
                LOGGER.info("Received  url: '{}' ", model.getUrl());

I see i have recieved 752 url and it reaches to webClient call but when i see subscribe block i am receving 500 response. I am not understand why i am not getting all url response even i recived 752 message.
Here is my webClient code as well as webClient configuration

public void execute(final String url) {
                .exchangeToMono(clientResponse -> clientResponse.bodyToMono(String.class)
                        .flatMap(responseBody -> Mono
                                .just(new Model(responseBody, clientResponse.statusCode().value()))))
                .onErrorResume(exception -> Mono
                        .just(new Model(exception.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR.value())))
                .subscribe(Model -> {
                        // save data in file

    public HttpClient httpClient() {
        return HttpClient.create().option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000)
                .doOnConnected(conn -> conn.addHandlerLast(new ReadTimeoutHandler(30000, TimeUnit.MILLISECONDS))
                        .addHandlerLast(new WriteTimeoutHandler(30000, TimeUnit.MILLISECONDS)));
    public WebClient webClient(HttpClient httpClient) {
        return WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient)).build();
2 replies
Deepak Bhimaraju
Hi there, has the behaviour for Schedulers.onHandleError changed between reactor-core versions 3.3.5 and 3.4.12? We were on Spring Boot 2.3.x (and reactor 3.3.5) before and had a Spring Integration Test that threw an error from one of the dependencies used in the reactor chain. The test expected the Schedulers.onHandleError method to handle the error but after upgrading to Spring 2.6.1 (and reactor 3.4.12), the test has started failing. Upon debugging, I see that Hooks.onErrorDropped is receiving the error. Wanted to know your thoughts on this. Thanks!

Hi folks. I noticed interesting thing today. Consider following example:

val source = Flux.concat(
    Mono.fromCallable { "one" },
    Mono.fromCallable { "two" },
    Mono.fromCallable { "three" },

    .timeout(Duration.ofSeconds(5), Flux.defer {
        logger.trace("Stopping job after 5 seconds of inactivity.")

Turned out Mono.fromCallable { "three" } may never get to materialize if source was cancelled by timeout() downstream. Can I do something about it? My goal is to have finalizing Flux() at the end of the source that must emit some event.

5 replies
Roman Bykovskiy
Hello guys! Can't understand how to buffer output to use it in batches. Here is the question and example: https://stackoverflow.com/questions/70083756/project-reactor-buffer-with-parallel-execution
3 replies

Hey folks. Curious if you could advise me on my use-case. I have a pool of workers that upload files to a remote server. It's basically Flux that is derived from Sinks.Many<Job> which I convert to a parallel one with predefined parallelism. My goal is to upload all files as fast as possible, not upload each file as fast as possible. You can assume a remote server has unlimited scalability (AWS S3).

My dilemma is that I need to find a balance between the number of files I upload simultaneously and throughput. If parallelism is too low I won't utilize bandwidth fully. If parallelism is too high (= too many chunks being uploaded simultaneously) the throughput of each upload is too low which may cause timeout disconnection.

The question to you is can I have dynamic parallelism in ParallelFlux so I can change number of file being uploaded on the fly ?

3 replies
Hi Folks,
I am trying to handle exception thrown by the downstream services and capture the error code as well as the error response message, and return the custom exception to the API. I am new in using Web client and Spring reactive technology. can someone help me how to handle with the custom exceptions in Webclient. Any links or reference would be appreciated.

Hi folk, I have a situation where I am using a reactive Redis lettuce.

I want to execute the MSET command and at the same set expiry in a different thread. here is my code

My question is using publishOn with Schedulers single is good or overkill.
As I have seen mostly when we use blocking code we can use publishOn or SubscribeOn Operator. Is there another operator I can use?

I want to execute expireKeysWithCatch separate then and do not want to wait for the result

    inline fun <reified T : Any> putMany(keyValuePair: Map<RedisKey, T>): Mono<Boolean> {
        if (keyValuePair.keys.isEmpty()) return true.toMono()
        val serializedKeyValuePair = mutableMapOf<String, String>()
        keyValuePair.forEach { (redisKey, value) ->
            serializedKeyValuePair[redisKey.value] = objectMapper.serialize(value)
        return reactiveRedisTemplate.opsForValue().multiSet(serializedKeyValuePair)
            .flatMap { Mono.fromCallable { expireKeysWithCatch(keyValuePair.keys) }  }
            .flatMap { it }

    fun expireKeysWithCatch(redisKeys: Iterable<RedisKey>): Mono<Boolean> {
        val redisKeysFlux = Flux.fromIterable(redisKeys)
        return redisKeysFlux.flatMap { redisKey -> expireKey(redisKey) }
2 replies
Benjamin Roth

Hey guys,

I'm unsure whether to ask this here or via the spring-boot folks. I'm giving it a try but feel free to send me away if this is not the proper channel. I'm trying to set up a reactive spring controller capable of returning a reactive multipart response consisting of e.g. application/json and application/octet-stream (application is backed by Spring Boot making use of netty). For now, I have a simple mocked response:

@PostMapping(value = "/multipart")
  public MultiValueMap<String, HttpEntity<?>> multipart() {
    MultipartBodyBuilder builder = new MultipartBodyBuilder();
        // mock response for now
        builder.asyncPart("json", Mono.just("{\"motto\": \"fake it til you make it\"}"), String.class);
        builder.asyncPart("audio", Flux.empty(), new ParameterizedTypeReference<byte[]>() {});

        return builder.build();

This does not work though as I'm greeted with:

class org.springframework.util.LinkedMultiValueMap cannot be cast to class org.springframework.http.codec.multipart.Part (org.springframework.util.LinkedMultiValueMap and org.springframework.http.codec.multipart.Part are in unnamed module of loader 'app')

Could you guys please give me some pointers on what I am doing wrong here?


Has anyone built a reactive component that works with a persistent queue? Something that can handle "any amount" of pushed objects from upstream where the downstream has limited capacity. My application is a server that receives unbounded input messages with a backend database of finite capacity. Given that the input messages are serializable, we can publish them to a chronicle queue with an immediate "ack" to the client. The backend then subscribes to the queue and pulls as fast as it can.

I can write the individual publish and subscribe components easily. And each side of the queue is easy to make reactive. But the "cleanest" approach would be to integrate this into the reactive pipeline.

Laurent Bovet
What are the thread rules for a custom Flux? For example, I create a Flux with Flux.generate(genFn). Can genFn be blocking? Similarly, could a reactive-streams Publisher be blocking on request(long). My point is that I consume from a filesystem directory that can be "empty" for quite a while but I still want backpressure if I start with thousands documents pending.


I can't believe :S

RIP Stephane, ethernal IDOL

sin el creo que reactor no existiera. todos le debemos mucho a el.

Condolencias a su familia.

Hi there,
Using Spring Cloud Gateway, I am trying to add a simple GatewayFilter which should add extra parameters to the request body and extra headers before forwarding the request. The request content type is application/x-www-form-urlencoded.
I have not yet familiarized myself with WebFlux and I was really surprised to see how difficult it is to achieve that simple requirement .
Looking at some example codes, it seems like accessing the request body and modify it required lot of ceremonies.
I would like to know if there are some utility classes which can help to achieve that simple task.
Thanks a lot and maybe it's time to me to learn project reactor.
(I already asked this question in the spring-cloud channel)
I'm looking to get only first result from flux, without stop it
Is there a method like doOnFirstResult(T value) ?
To avoid to write a counter
3 replies
Salman khandu

We have an application built on top of the spring cloud stream. We are processing around 6000 message which is URLs that we have to process. Here is my code snippet

public Mono<ResponseModel> download(final String url) {
        return webClient.get().uri(url)
                .exchangeToMono(clientResponse -> clientResponse.toEntity(String.class)
                        .map(responseEntity -> new ResponseModel(responseEntity.getBody(), responseEntity.getStatusCode().value(),
                                        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))))));


// Broker consumer    ( Using spring cloud stream)
public Consumer<Model> consumer() {
    return model -> {
        .onErrorResume(exception -> Mono.just(new ResponseModel(exception.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR.value())))
        .subscribe(responseModel -> {
                  // Some java code processing
                 //db level check 
                 // db save entity 
                 // file save

So the consumer() is the entry point for getting message from broker to process data. So Consumer is blocking but internally
we are trying to use non-blocking WebClient with subscribe(...). The subscribe(...) will contain blocking code as mentioned above like some java code processing, DB operation and finally save it to file. is it the correct way to do that like WebClient is written in a non-blocking way and under subscribe(..) block there is blocking calls or is there any better way to do such things? Thank you

1 reply