Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
@bartoszolczyk I do not see problem at first look at your code. What triggers questions in my head, is use of next().flux() only in one of three usages of create and return connection: createAndReturnConnection(command).next().flux(). Why it is so?

@luvarqpp thanks for asking - I did not expressed actual reactor implementation so have my apologies - due to some big brain time I've split the flux implementation into several, smaller ones (they are in a spot below where you are asking )

    protected ReactiveCallImpl createConnectionDetails(MMFilterableCommand command) {

        final var instance = getInstance(command.getApplicationName());
        return ReactiveCallImpl.builder()
    protected String getUrl(InstanceInfoDto instance) {
        return UriComponentsBuilder.newInstance()

and here (after this mentioned method i'm adding some flavor to manage the fluxes ):

    protected Flux<T> manageConnectionConfiguration(ReactiveCall reactiveCall,
            Flux<T> out) {
        return out.doOnError(throwable -> {
                            connections.remove(new ReactiveCallsKey(reactiveCall.getApplicationName(), reactiveCall.getTicker(),
                            log.error("connection error for {} reactive source",
                                    new OperationException(systemExceptionMessage, HttpStatus.INTERNAL_SERVER_ERROR,
                        subscription -> log.debug("subscription established for {} ", reactiveCall.getApplicationName()))
                .doOnComplete(() -> log.debug("subscription completed for {} ", reactiveCall.getApplicationName()))
                .doOnError(throwable -> {
                    log.error("exception occurred connection for {}  {}", reactiveCall.toString(),
                            new ReactiveCallsKey(reactiveCall.getApplicationName(), reactiveCall.getTicker(),

                .doFinally(vinChangedEventSignal -> log.debug("client connection terminated for {}",

Hi all,
I want to delay Flux elemets, but Java process was finished with exit code 0

public static void main(String[] args){
   Flux.range(1, 10)
                .flatMap(data -> {
                    log.info("test : {}", data);
                    if (data == 5) {
                        return Flux.error(new RuntimeException("test"));
                    return Flux.just(data);
                .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
                .onErrorResume(e -> {
                    return Flux.empty();

That is my code i intend Flux stream infinite. but not working.
Why my java process was finished?
plz help me.

1 reply
Harman Singh

Hi team, I am confused by expectNoEvent in StepVerifier.
The following test passes as expected.

  public void testWithDelayElements() {
    StepVerifier.withVirtualTime(() -> Flux.just(1, 2, 3).delayElements(Duration.ofSeconds(1)))

expectNoEvent will correctly throw an assertionError if we set duration in delayElements to less than 1 second. This makes sense since the stepverifier receives an event sooner than the provided duration.

But if we remove delayElements from the flux, we expect the test to fail since the flux emits the 1 event immediately. right?

  public void testWithoutDelayElements() {
    StepVerifier.withVirtualTime(() -> Flux.just(1, 2, 3))

However, the test passes. I expected an expected no event assertion error to be thrown. Am I missing something?

2 replies
I am looking for Mono.repeatWhenEmpty example. Couldn't understand what is the second argument (repeat factory)
Guillaume DROUET
Hello, what is the best approach to consume two sources in parallel and get the values from only one? Something like when(), but I need a Flux from of one the sources in result
Hello :)
I building a reactive APP, which uses Spring Events to propagate events in an async way (using ApplicationEventMulticaster or ApplicationEventPublisher with @Async on the Event Listeners). Both solutions do not work properly as sometimes I do not see the events being triggered. Do you know if if Spring Events are compatible with spring-reactor? If not, could you please tell me what is the alternative to Spring Events on spring-reactor?
Thanks in advance
Hello guys! I'm new here 😇 anybody please tell me how to discuss topics here ,
Ashok Koyi

I am using Websockets with reactor-netty library & after a minute/two, the session gets terminated silently & the application gets stuck without any error in the middle of messages being exchanged b/n HttpClient & Websocket Server using WebsocketInbound & WebsocketOutbound. This happens when the debug log showsHttpClient is exchanging WebsocketFrames. Once this happens, it halting the exchange & everything gets stuck (frame exchange halts suddenly after this) silently.

There is no error in console, no CloseWebSocketFrame (WebsocketInbound#receiveFrames), no WebSocketCloseStatus (WebsocketInbound#receiveCloseStatus), nothing. It just gets stuck & I get debug statements saying 0 active connections & 0 inactive connections. Where as just a second before, the log shows its receiving binary frames from server.

Did anyone else face this issue. How can I debug whats causing this termination?

Ashok Koyi
@violetagg Any idea on whats the process to debug this odd issue? (the connection is being removed from the connection pool automatically while messages were still being exchanged b/n both ends). Appreciate any help in this regard.
51 replies
Hi! I'm looking for technology which I can use in my reactive project. I'm interested in reactor but i'm not sure if it's good for my project. So here are my questions to You: 1.The Scheduler is ready to be installed in HA.
2.Does It has a queueing model implemented?
3.Does It stores tasks (events, definitions)?
  1. Does it removes triggers?
  2. Do he requests are cyclic?
  3. Does It has timed triggers?
I would be very grateful for helping me in this! And thank you for your time!
Hi, can I update the context based on the result of a mono execution?
James Howe

I'm new to Reactor (and reactive programming) and I have a question about error handling that I hope someone can help with.

I have some Reactor code which uses flatMap to make several asynchronous HTTP calls. After the flatMap, we process the responses to build a final result to return to a user. It is possible that some, or all, of these calls may fail. The most common reason would be that something about the input request was invalid. We want to capture all of the responses, including the error responses.

In looking at the Reactor API, I was hoping to find a method which could substitute a replacement value when an error occurred in a flatMap operation, but continue processing the rest of the pipeline, but this doesn't seem to be the case? The onErrorContinue lets me ignore the error and maybe log data, but I can't push data to the pipeline to replace the bad item. The onErrorResume lets me start a new Flux, but I want the rest of my flatMap to run. I just want to replace the bad result(s) with some error result.

My current code does the following:

    Function<Request, Publisher<ObjectNode>> requester = serviceClient.getAnalysis().apply(authToken, user);
      Flux.fromStream(requestStream(noOfRequests, new Random()))
        .onErrorContinue((ex, response) -> log.error(ex.getClass().getCanonicalName() + ": " + ex.getMessage()))

The 'requester' is a function that takes a request and makes an Http call which returns a Publisher. The 'requestStream' method generates a set of test requests, some of which may cause 400 level errors when the request is processed. The expected result is a list of ObjectNode objects which contain JSON which is returned to the user.

What I would like to be able to do is handle the 400 errors, create an 'error JSON' object to replace the bad response, as well as log the original exception. Since I'm new to Reactor and reactive programming, I may be overlooking an obvious solution.



I'm using reactor 3.4.18 and have a question about Flux.groupBy. I have generated 1000 integers and splited them into 100 groups, I expect that each group could be process in sperate thread but it hangs after several integers processed.

    void shouldGroupByKeyAndProcessInParallel() {
        final Scheduler scheduler = Schedulers.newParallel("group", 1000);

        StepVerifier.create(Flux.fromStream(IntStream.range(0, 1000).boxed())
                .groupBy(integer -> integer % 100)
                .flatMap(groupedFlux -> groupedFlux
                        .subscribeOn(scheduler) // this line doesn't help
                        .doOnNext(integer -> log.info("processing {}:{}", groupedFlux.key(), integer)),
10:47:58.670 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
10:47:58.846 [group-1] INFO com.huawei.hwclouds.coney.spike.FluxGroupByTest - processing 0:0
10:47:58.866 [group-1] INFO com.huawei.hwclouds.coney.spike.FluxGroupByTest - processing 1:1
10:47:58.867 [group-1] INFO com.huawei.hwclouds.coney.spike.FluxGroupByTest - processing 0:100
10:47:58.867 [group-1] INFO com.huawei.hwclouds.coney.spike.FluxGroupByTest - processing 1:101
10:47:58.867 [group-1] INFO com.huawei.hwclouds.coney.spike.FluxGroupByTest - processing 0:200
10:47:58.867 [group-1] INFO com.huawei.hwclouds.coney.spike.FluxGroupByTest - processing 1:201
-------- start hanging ----------

I have changed the flatmap concurrecy to 2 to speed up the reproduction.

Pradeep Kumar
Hi Folks I am trying to read the request body in Spring Cloud Gateway before the request reaches to
actual downstream services. I know that request can consumed only once but Is there any way
to do this?
Is there any documentation to help in writing efficient operators that can participate in operator optimization, fusion, etc.? A lot of the helper classes/interfaces used by Reactor's own operators are package-private.
Hi Guys, I am trying to demonstrate the concept of backpressure in reactive streams and i want to observe/log the effect of backpressure in an endpoint. Any ideas as to how i can demo the effect ?
How do i combine multiple fluxes of type T into another flux of type T[], details here: https://stackoverflow.com/questions/74049770/best-way-to-combine-multiple-fluxt-into-fluxt/74051618#74051618
Dexter Huang
Hi, are there any use cases demonstrating Sinks.EmitResult.FAILED_CANCELLED status? - https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Sinks.EmitResult.html#FAIL_CANCELLED
Nick Caballero
Hi, what's up with the BlockHound project? Is anyone still watching issues/PRs there? https://github.com/reactor/BlockHound
2 replies
Hi, I am using reactor rabbitmq, the problem I have is that my receiver does not recover or catch the exception when there is a rabbitmq delivery acknowledgement timeout "Channel error on connection <####> : operation none caused a channel exception precondition_failed: consumer ack timed out on channel 1". The receiver thinks that the connection is still open but rabbitmq already closed the connection.
Manvendra Singh

how to make sure each flux item is processed in parallel, when chaining multiple Flux calls,

here is the code to explain the problem

public class ParallelCheckController {

    List<Item> searchItems() {

        Map<BackendModule, List<Long>> idsByBackend = new HashMap<>(10);
        idsByBackend.put(new BackendModuleOne(),Arrays.asList(11L,12L,13L));
        iidsByBackend.put(new BackendModuleTwo(),Arrays.asList(21L,22L,23L));
        idsByBackend.put(new BackendModuleThree(),Arrays.asList(31L,32L,34L));

        //TODO: I want to call all backend concurrently in parallel.
       //But all Modules are called on same thread.
        return Flux

interface BackendModule {
    abstract Mono<Item> searchItem(Long itemId);

    default Flux<Item> searchItems(List<Long> itemIds) {
        sleep(); log(itemIds);
        //TODO: I want to call all searchItem call concurrently in parallel.
        //But all Items from one module going on same thread.
        return Flux
    //To mimic the actual code latency
    default void sleep() {
        try {Thread.sleep(1000);}
        catch (InterruptedException e) {
            throw new RuntimeException(e);
    //To see the threads in logs
    default void log(Object obj) {
                        + " " + this.getClass().getSimpleName()
                        + " " + obj
 class BackendModuleOne implements BackendModule {
    public Mono<Item> searchItem(Long itemId) {
        sleep(); log(itemId);
        return Mono.just(new Item(itemId));//This is a call to backend WebClient
 class BackendModuleTwo implements BackendModule {
    public Mono<Item> searchItem(Long itemId) {
       return Mono.just(new Item(itemId));//This is a call to backend WebClient
 class BackendModuleThree implements BackendModule {
    public Mono<Item> searchItem(Long itemId) {
        return Mono.just(new Item(itemId));//This is a call to backend WebClient
class Item {
    private final Long id;

Here is StackOverflow link for more explanition

Abhishek S


I'm looking for some help to figure out the best way to stop sending messages as soon as I get a PoolAcquirePendingLimitException.

package datadidit;

import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientRequestException;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException;
import reactor.netty.resources.ConnectionProvider;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class NettyWebClientBackPressureIT {
    private Logger LOG = LoggerFactory.getLogger(NettyWebClientBackPressureIT.class);

    private MockWebServer server;

    private String mockUrl;

    public void setup() throws IOException {
        server = new MockWebServer();

        mockUrl = "http://localhost:" + server.getPort();
        LOG.info("URL:{}", mockUrl);

     * Scenario slow to response server, simulating handling pendingAcquireMaxCount
    public void testClientBackPressure() throws InterruptedException {
         * Make a connection pool with small queue to easily simulate scenario
        ConnectionProvider.Builder builder = ConnectionProvider.builder("custom")

        HttpClient client = HttpClient.create(builder.build());
        WebClient webClient = WebClient.builder()
                .clientConnector(new ReactorClientHttpConnector(client))
         * Add some mock responses with slow response times
        for (int i = 0; i < 10; i++) {
            server.enqueue(new MockResponse()
                    .setBody("Hello World")
                    .setHeader("Content-Type", "text/plain")
                    .setBodyDelay(1, TimeUnit.SECONDS));

        for (int i = 0; i < 10; i++) {
            Mono<String> result = getResult(webClient);

            result.subscribe(x -> LOG.info("Response: {}", x));


    public Mono<String> getResult(WebClient webClient) {
        Mono<String> result = webClient.get()
                .doOnError(WebClientRequestException.class, error -> {
                    if (error.getMostSpecificCause() instanceof PoolAcquirePendingLimitException) {
                        LOG.error("Reached max number of PEDNING connections stop sending!");

        return result;

My desired behavior is for as soon I get the PoolAcquirePendingLimitException for the for loop above to immediately break.

Igor Artamonov
Can someone help me with Reactor context? I have an ungly but working code, as described at https://stackoverflow.com/questions/74354786/propagate-reactor-context-back, but want to find a correct way to propagate a context from an innner publisher. Is that possible?
6 replies
Burak Akça
Is the reactor supports IPC?
Hey all, trying to implement something from the TCPServer, which will receive a large payload (custom string, potentially XML), do some processing, and reply back on the outbound channel based on some data from the inbound. However, as it is chunked, I cannot seem to correctly concat the inbound data together. I have a small gist example here, where I just want to take the large incoming payload, and convert it to upper case, but without it being chunked. So the flatmap op would occur once only.
Checking the example, and in the tests on the repo, most of the examples just pass back some Mono.just("string"), but I need the large message from the inbound channel in its entirety. I have tried with inbound.receive().asString().reduce(String::concat) but with no luck. I believe I need something like the JsonObjectDecoder / ChannelInboundHandlerAdapter, but struggling to find any examples of its use in reactor
27 replies
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                    //inbound.connection.addHandlerLast() Is there a combiner I can use here?
                         .handle((inbound, outbound) -> {

                         //Tried to concat here, but then the flatMap is never called.
                         //return inbound.receive().asString().reduce(String::concat)

                         return inbound.receive().asString().flatMap(s -> {
                         //this is chunked, but I need the S to be the complete string.
                            NettyOutbound out = outbound.sendString(Mono.just(s.toUpperCase()));
                            return out


Mico Piira

Hey all. The new automatic ThreadLocal propagation feature in Reactor Core 3.5.0 works only for 'handle' and 'tap' operators. I would like to propagate all thread locals that have been registered to the ContextRegistry from the thread that calls 'block' to all operators.

With these 2 separate hooks I think I got it working, but I would like to know if there is something wrong with this or if there is a better way to achieve this?

        Hooks.onEachOperator("propagate", Operators.liftPublisher((publisher, coreSubscriber) -> new CoreSubscriber<>() {
            public Context currentContext() {
                return coreSubscriber.currentContext();

            public void onSubscribe(Subscription s) {
                try (ContextSnapshot.Scope ignored = ContextSnapshot.setAllThreadLocalsFrom(coreSubscriber.currentContext())) {

            public void onNext(Object o) {
                try (ContextSnapshot.Scope ignored = ContextSnapshot.setAllThreadLocalsFrom(coreSubscriber.currentContext())) {

            public void onError(Throwable throwable) {

            public void onComplete() {

        Hooks.onLastOperator("contextCapture", objectPublisher -> {
            if (objectPublisher instanceof Mono) {
                return Mono.from(objectPublisher).contextCapture();
            if (objectPublisher instanceof Flux) {
                return Flux.from(objectPublisher).contextCapture();
            return objectPublisher;
2 replies
I am looking for a scenario in iterating the incoming request and attach the remote service response back to it
  1. Iterate the Array of items in the incoming request
  2. send the item to remote service to fetch response
  3. update or attach the received response back to the corresponding item in the request
Igor Artamonov
Another silly question. How to fire a side event after a subscription to a Flux.fromPublisher(sink) is fully propagated to the sink itself? flux.doOnSubscribe() doesn’t help because it happens earlier.
The detailed question at https://stackoverflow.com/questions/74537433/fire-event-after-reactor-publisher-subscribed
1 reply
Ivan Vyazmitinov

Hello, dear Reactor community!

In light of the release of Project Loom and some claims that the reactive approach on JVM will soon be dead, I decided
to write up a detailed series of posts describing why it may be
not so certain. I have already published 3 out of 4 parts (it struck me to share it here very late) and hope you
will find it helpful. And, of course, any feedback is appreciated!

I think it depends on how you use Reactor. If you're primarily using Mono<T> as futures to compose I/O-bound operations in, say, a Spring WebFlux service, then yeah, I think Loom will make a pretty big dent. More complex flows I think will retain the advantage of using Reactor, to the extent that it would be like an async version of Java Streams.
1 reply
Igor Artamonov
I think the main problem that cause people to avoid reactor is inability to get good traces and debug in general. But maybe it can be fixed? Right now it all based on JVM stack traces that are useless in such env. But maybe Reactor can use a special context to keep a reactor-trace of the execution? i.e. it doesn’t stop with Mono.error(Throwable), but with a special type of an event that unrolls the execution context and provide a new type of exception. Kind of what JVM does to build a stack trace, but with all the reactor specific instead? I know there is a .checkpoint but for a some reason it’s not very helpful. Maybe because it moves the responsibility to the developer.
11 replies
I think that impacts operational use of Reactor, but I think the bigger issue (by orders of magnitude) is that reactive programming is extremely different and much more difficult than imperative programming in vanilla Java. Kotlin, with coroutines, can smooth that out, but you still end up with the "color function" problem.
25 replies
Oleh Dokuka
Igor Artamonov
I had a question above, but maybe now is the better time to ask it again. What is the best approach to do something after being fully subscribed to a Sink? That do something causes a response from that sink and sometimes too fase. So if I use .doOnSubscribe it get it back through the sink almost immediatelly sometimes before it fully subscribed. So I loose the reponse. And the thread that send to the sink shows error with “no subscriber”.
Hello I look for make a process only on the first element of a Flux
Is there a method for that ?
The only way I found is to do like
            Flux.create { emitter ->
                var firstTime = true
                // create a stream
                createStream().map { 
                     if (firstTime) {
                         // ....
                         firstTime = false
                      } else {
                          // ....
8 replies
Philipp Paul

Hi, I want to replay the last element of a Flux and keep the full sequence. Currently I use:

        var f = Flux.range(0, 10);
        var res = Flux.concat(

which seems to work in simple examples and in my tests, but if I run it in an endpoint retrieving Flux<byte[]> I get NoSuchElementException. Is this in general a bad idea?

8 replies
Subhalakshmi Krishnasamy
Where can I find best practices guide for reactor usage ?
Violeta Georgieva
@Subhalakshmi1986 Did you check https://projectreactor.io/learn

Hi, suppose I have a Sinks.many().unicast().onBackpressureError() and created a Flux out of it and using it as a producer. Now, at a later point of time, I want this first sink to receive messages from another Sink. That is whenever the new/second sink emits messages, this first sink should also emit the same message. Is it possible to achieve this? (If Sinks.many().unicast() is a bad idea and multicast() or replay() would work, I am fine with changing to that too)

Full usecase:
Creating a WebSocket chat server and when a session is created, I don't know to which all chat group this session will subscribe to and receive the messages. The chat groups itself can be created after a WebSocket session is created.

    public Mono<Void> handle(WebSocketSession session) {
        Sinks.Many<String> unicast = Sinks.many().unicast().onBackpressureError();
        Mono<Void> receiver =
                    //process incoming messages

        Mono<Void> sender =

        return Mono.zip(receiver, sender).then();

If this session later sends a message to join another group (for which I have a Sinks.many().replay().limit(10) and would emit from this sink whenever message is received on this group), how can I connect the original sink with this one so that when the second sink emits, the first sink emits the message to the session?

Ganesh Kusundal
Hi Guys,
I am new to the reactor or reactive programming and wonder how retry on flux works when millions of events need replayed.
Also, have the following doubts in mind.
  1. Where it stores these events for retry when the flux is created out of the sink?
  2. Is onBackPressure with Sink, can be used with Kafka or etc to handle millions of events per second?
  3. Is Buffer and onBackPressureBuffer, have different underlying implementations? wanted to know because, in my application, I have to batch process millions of ongoing events and then commit them to Kafka topic.
padma preethi
Hi Everyone, I'm new to reactive programming and we are migrating our Spring boot application to reactive. Basically our service calls bunch of downstream services and kind of aggregates and sends data to the app. I'm little confused on what would be the right way to configure webclient for this usecase. Should i create a new webclient for each downstream service ? In that case, will all these webclients reuse the same underlying resources like eventloop ?
1 reply