Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
Daniil Mikhaylov

Hi there!
I have a question. Why it's recommended to run blocking code out of Mono.defer() or mono.flatMap() in Mono.fromRunnable() or Mono.fromCallable()?
I mean, if I look at sources of Mono.fromRunnable(), actually it does like the same as Mono.defer():

    public void subscribe(CoreSubscriber<? super T> actual) {
        MonoRunnableEagerSubscription s = new MonoRunnableEagerSubscription();
        if (s.isCancelled()) {
        try {
        } catch (Throwable ex) {
            actual.onError(Operators.onOperatorError(ex, actual.currentContext()));

The blocking code runs in subscribe() method. Supplier in Mono.defer() does the same?
Does it matter if I switch thread through .subscribeOn() operator?
Also MonoSubscribeOn cause me for thought. It does worker.schedule(() -> s.request(n)); on parent, when subscribe happens. But I can't find what happens, why request() scheduled on worker, not subscribe()?


Hi all,

I have function that deletes user both from database and keyclaock. Sometimes it happen that user gets deleted from database but remains in Keyclaock. Dose anyone have an idea what could be a problem? Only happens from time to time but do not know why.

        return _companyRepository.findById(companyId))
            .flatMap(companyDocument -> {
                    final List<Employees> employees = companyDocument.getEmployees();
                    if (employees != null && !employees.isEmpty())
                        final boolean isRemoved =
                            employees.removeIf(employees -> employees.getId().equals(employeeId));
                        if (isRemoved)
                            return _companyRepository.save(companyDocument).thenReturn(employeeId);
                return Mono.just(employeeId);
            .flatMap(employeeId -> _keycloakService.deleteUser(employeeRealm, employeeId))
            .onErrorResume(e -> Mono.empty());
Salathiel Genèse
And a StackOverflow +50 bounty

Hi, I am using SSE in the Spring web flux application. I have created an SSE endpoint that gets data from Redis pub-sub and Redis-pub gets data from an External WebSocket connection.

My problem is that when there is no data from Websocket then as soon as I try to connect with my SSE endpoint the connection terminates due to the unavailability of data. My hunch is that I need to pass comments heartbeat if data is not coming. how can I do that?

private fun realtimeSinkInitializer() {

         * onBackpressureBuffer by default cancel the subscription when last client closes, we made auto cancel false
        realtimeSink = Sinks.many().multicast().onBackpressureBuffer(1000, false)

            .map(ReactiveSubscription.Message<String, String>::getMessage)
            .map { msg ->
                { weatherData: List<WeatherDto> ->


                { err ->
                    logger.error(err) { "Error occurred redis subscription channel" }

 fun getRealtimeWeather(): Flux<ServerSentEvent<Map<String, WeatherDto>>> {
        return weatherSink.asFlux()
            .map { e -> ServerSentEvent.builder(e).build() }.onErrorResume { e ->
                logger.error(e) { "Error getting realtime weather data" }
2 replies
Mantas Matūzas
Hello, I'm trying to integrate reactive and non-reactive code. The use case is to have non-reactive event listener and call reactive service. Do you think is it fine to do something like this?
  public void handle(TheEvent event) {
Salathiel Genèse


The subscribe() method return a Disposable... If you do not keep a reference to that object, when do you call its dispose object ?


Even if you do, it doesn't really make the whole thing reactor-compliant

What you need (I think)
Is a publisher [maybe within your service], to which you can PUSH TheEvent as they come in
The handle( TheEvent ) method from your except pushes events and do not subscribe to it
Salathiel Genèse
You reactiveEventService will be responsible of subscribing to that published OR, if it exposes that publisher (Mono, Flux or else), whatever code want to listen from it
Mantas Matūzas
@SalathielGenese makes sense. I will try it.
Salathiel Genèse
Anand Jaisy
public DiscountCommand create(Mono<String> insertId, DiscountCommand discountCommand) {
    return insertId.map(item -> {
        return new DiscountCommand(
    }).block(Duration.of(1000, ChronoUnit.MILLIS));
If the insertId is null, it throws an null pointer exception and application goes to infinite. How can I handle error
Salathiel Genèse
1./ You may want to annotate your parameter as non null

2./ You can use if/else OR Optional

Optional.ofNullable( insertId )
  .map( __ -> insertId.map( item -> new /* ... */ ) )
  .orElseGet( Mono::empty )
  .block(/* duration */);

Something like that (untested)

9 replies
Hello, How can emit an item received through a rest endpoint without a blocking call to emit. i want to emit the item to a sink and return the item as response.
Sinks.Many<String> inputSink = Sinks.many().multicast().onBackpressureBuffer();
public Mono<String> process(String uid) {
inputSink.emitNext(uid, (signalType, emitResult) -> {
return false;
return Mono.just(uid);
Basically, i want to process the item received via rest endpoint asynchronously. I would like to subscribe to the processed items in another thread as shown below
I am subscribing to inputSink and process item and emitting outputSink
Sinks.Many<String> outputSink = Sinks.many().unicast().onBackpressureBuffer();
public Flux<String> getProcessedItems() {
        return outputSink.asFlux();
Is this approach correct?

Hi folks. I'm trying to use Sinks.many().unicast().onBackpressureBuffer() for the first time and I have unexpected result. Here is how I use it

// on Thread B

  .map {
    // doing work here
    // executed on Thread B, instead of THREAD-POOL-A

I see that flushingSink.tryEmit is blocking AND queued messages are processed on calling thread. I thought they would be queued asynchronously. What am I doing wrong ?

3 replies

Does anybody know if passing Mono/Flux around in the project is right way of doing chaining? It seem like I'm losing performance on all these reactor wrappers. In profiler I see call-stack like this one https://snipboard.io/fXT6KW.jpg

I see calls as deep as 150 nested Reactor invocations.

1 reply
Hi folks! What is the right answer to this question ? https://stackoverflow.com/questions/31185129/parallel-dispatch-of-groupby-groups-in-reactor
Basically I'm trying to do the same - I want to process some work in parallel, always preserving original order per group (even if group changes thread later).
6 replies
Hi there!
I want to read a message and then execute a http request. If http request 2хх, ack message, otherwise - nack. But the problem is flatMap returns Mono<ResponseEntity> and I can’t pass to map original amp message.
RabbitFlux.createReceiver().consumeManualAck(QUEUE_NAME, consumeOptions)
        .flatMap(amqpMessage -> {
            String url = new String(amqpMessage.getBody());
            return webclient.get().uri(url).retrieve().toBodilessEntity();
       .map(responseEntity -> {
           if (responseEntity.getStatusCode().is2xxSuccessful()) {
               // how to pass here amqpMessage?
           } else {
           return amqpMessage;
1 reply
Hi everyone ..I have 2 hot flux which are running in parallel and one mono. I want mono to be always executed after both the flux have completed. I tried to concat first flux and mono but mono executes immediately after first flux has completed.How do i make sure that mono always runs after both the parallel flux have completed.Below is just an example.

var sourceFlux= Flux.range(1,10);
var hotflux1=sourceFlux.publish().refCount(2);
var firstFlux=hotflux1.map(i->i*2).log("First");

//this is a io blocking call
var secondFlux=hotflux1

var firstMono=Flux.just(300).log("First Mono");
var concatedFlux=Flux.concat(firstFlux,firstMono);

3 replies
In real first flux is calling some service and second flux is db call and first mono is also service call. Mono requires data from both first flux and second flux before calling its service.
Ben Tomasini

I am using an approach similar to this to lift a reactor context into ThreadLocal for use by a grpc client interceptor.


I am getting some stale values due to some bugs in the way I am using it. My main concern with this approach is its dependence on the onNext call to lift a fresh context into the ThreadLocal. If used incorrectly - namely not doing a contextWrite on a stream, it can lead to leaked values from other threads.

One way I thought of to add more safety would be to clear the ThreadLocal in the onComplete or onError of the subscriber. From my initial inspection it appears this works because, so far, the onComplete and onError is invoked on the same thread as onNext.

Question: Is it guaranteed that onComplete and onError is called on the same thread as onNext for a given subscriber like this?

1 reply

Is there any exhaustMap operator on reactor core ?

I have same question, came across how API doc of Reactor 3, can't find equivalent operator, google shows me this channel and conversation

Marek Andreansky
Can you recommend a well written open source library that utilizes Reactor? I would like to read through some real code to see how I could utilize Reactor.
Salathiel Genèse
Spring Webflux !?
Netty ?
John Meehan
Looking for a way to chunk a Flux<ByteBuffer> into specified chunk sizes in bytes. Does such a solution already exist somewhere?

Looking for a way to chunk a Flux<ByteBuffer> into specified chunk sizes in bytes. Does such a solution already exist somewhere?

concatMap and external state for buffer remainders less than specified size?

John Meehan
That's the path I'm headed down. Coming from Akka streams and Scala, keeping external state seemed like the wrong approach. But I see from other examples that's not out of the norm here. Something like?
    public static Flux<ByteBuffer> chunker(Flux<ByteBuffer> data, int chunkSize) {
        return Flux.defer(() -> {
            AtomicLong countDown = new AtomicLong(chunkSize);
            return data.bufferUntil(buf -> {
                if (countDown.addAndGet(-buf.remaining()) <= 0L) {
                    return true;
                return false;
            }).concatMap(bufList -> {
                int size = bufList.stream().mapToInt(ByteBuffer::remaining).sum();
                return Mono.just(bufList.stream().reduce(ByteBuffer.allocate(size), ByteBuffer::put));
4 replies
Hi, how to implemente throttling in reactor?
Daniel Wahlqvist
Can I do a nested groupBy? I have equipments and for each equipment I have different metrics. So I would like to do a groupBy for each equipment and then an inner groupBy for each type of metric. That might be the wrong approach of course but I'm mostly trying to get started and learn
2 replies
josh gruenberg

is there a straightforward way to achieve a graceful flush/shutdown with an infinite source? I have a Flux from a KafkaReceiver, and at shutdown, I want to stop receiving messages from upstream, but allow all of the in-flight work to complete, and then allow the KafkaReceiver to commit its offsets (via the provided ReceiverOffset.acknowledge callback), before the KafkaReceiver closes its underlying kafka consumer. dispose seems to propagate upstream immediately, canceling all buffered work... and it also induces the KafkaConsumer to be closed before the offsets associated with that work can be committed.

I think this might naturally take the form of arranging for the request signals flowing upstream to the KafkaReceiver to be disabled at shutdown time, but I haven't found a simple way to inject that behavior into the wiring. I'm using a pretty hacky solution that involves publish to a ConnectableFlux, with a bogus consumer that stops requesting ... but this seems pretty heavyweight and unnatural. Any suggestions?

3 replies

Hi everyone, I have been using Mono.zip a lot in my application and since I am from nodejs background, I was thinking it works like promise.all but I found recently In mono.zip if one of the mono returns empty it won't run


fun main() {
    val m1 = Mono.just("A")
    val m2 = Mono.just("B")
    val m3 = Mono.empty<String>()

    Mono.zip(m1,m2,m3).map {
        println("Data t1: ${it.t1} t2: ${it.t2} t3: ${it.t3}")

It dont prints anything


fun main() {
    val m1 = Mono.just("A").map { Optional.of(it) }.defaultIfEmpty(Optional.empty())
    val m2 = Mono.just("B").map { Optional.of(it) }.defaultIfEmpty(Optional.empty())
    val m3 = Mono.empty<String>().map { Optional.of(it) }.defaultIfEmpty(Optional.empty())

    Mono.zip(m1,m2,m3).map {
        println("Data t1: ${it.t1.get()} t2: ${it.t2.get()} t3: ${it.t3.orElse(null)}")

My requirement is I have a couple of mono and I need output of all of them even one is empty. In case any of the mono fails then it can throw an error.

@OlegDokuka. any view regarding this buddy

1 reply
Salathiel Genèse
I think you're doing it right, Zakir. Since null value will result in an empty mono
3 replies
Daniel Wahlqvist
I have a flux and I want to dynamically tell it how to transform the data. My idea is to join the flux with another "command flux" and adapt the processing depending on the last command. Is this idea similar to a real and tested pattern someone else have used? I suspect you guys can give me some hints. What I need to do is to figure out how to implement this and I need to validate if it's a good pattern at all.
3 replies
Daniel Wahlqvist
I've been reading the API documentation and feel a little overwhelmed but also impressed and curious. Now I'm wondering; what is Reactor? Sure, I get it's a reactive programming framework but what I haven't anticipated is what such a framework must evolve into, if it's successful. I didn't realize this before but now I believe a successful reactive programming framework MUST in the long run evolve into a successful stream processing framework. That might be a strange claim, and I know that's not what many people believe. E.g. this attitude is common: https://www.tikalk.com/posts/2017/12/06/streaming-vs-reactive/ Given all this; what is Reactor and what is it evolving into?
Salathiel Genèse
That article just sparks confusion
The term “stream” [...] is a generic term that applies to Java Streams, Observables and even Iterators.
1 reply
Kirill Marchuk
Hi all. I am a bit confused with Gradle Metadata. In the .pom artifact from here: https://repo1.maven.org/maven2/io/projectreactor/reactor-core/3.3.5.RELEASE/, it says "published-with-gradle-metadata". but the folder for this release (same URL) does not contain any .module file. How come?
Hi! I have a problem with using ExchangeFilterFunction in WebClient. I declared a filter that runs backoff retry on error, but for WebClientResponseExceptiontype it doesn't work, retry mechanism not run. Unlike WebClientRequestException for which is working. What I do wrong?
//creating client 
var client = WebClient.builder()
               .defaultHeader(CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
//example of using
//`decorateFunction` is a like below 
public interface ReactiveRequestDecorator extends ExchangeFilterFunction {

     * Decorate the given request.
     * @param request the request to decorate
     * @param <T> the type of result returned
     * @return decorated request
    <T> Mono<T> decorate(Mono<T> request);

    default Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
        return decorate(next.exchange(request));


// example implementation
public <T> Mono<T> decorate(Mono<T> request) {
        return Mono.defer(() -> request)
                .retryWhen(Retry.backoff(3, Duration.ofMillis(200));
1 reply
Patrick Gotthard

Hi all. Is there any difference regarding memory usage when using share() in the following code?

final Path file = [...];
final Flux<DataBuffer> buffer = this.webClient.get().uri(url).retrieve().bodyToFlux(DataBuffer.class).retry(3);

// Option 1 without calling share()
DataBufferUtils.write(buffer, file, StandardOpenOption.CREATE).block();

// Option 2 with calling share()
DataBufferUtils.write(buffer, file, StandardOpenOption.CREATE).share().block();

My application has to download a lot of files and I want to use as less RAM as possible.


hi all, Anyone has used Flow, Coroutine, and Webflux together in production. I want to know how is the experience. I have a spring weblfux application, I want to convert it to flow and webflux. The benefit would be imperative looking at asynchronous code


4 replies
Ghenadii Batalski

Hello all, i need to investigate the subscription cancellation cause on a webflux application. How can i log/determine the cause of a cancel event. Following code is called from the RestController:

 Mono.fromCallable(() -> {
            // do some blocking stuff
            log.info("creating some value");
            return someValue;
    .doOnNext(p -> log.info("someValue created: {}", p))
        .doOnError(throwable -> log.error("Something went wrong on someValue creation", throwable))
        .doOnCancel(() -> log.error("cancelled on something ..."))

and produces following log:

"message":"| onSubscribe([Fuseable] FluxOnAssembly.OnAssemblyConditionalSubscriber)","logger_name":"reactor.Mono.OnAssembly.224","thread_name":"nioEventLoopGroup-3-2"
"message":"| request(unbounded)","logger_name":"reactor.Mono.OnAssembly.224","thread_name":"nioEventLoopGroup-3-2"
"message":"onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)","logger_name":"reactor.Mono.CallableOnAssembly.223","thread_name":"nioEventLoopGroup-3-2"        
"message":"| cancel()","logger_name":"reactor.Mono.OnAssembly.224","thread_name":"reactor-http-epoll-4",
"message":"cancelled on something ...","logger_name":"domain.Service","thread_name":"reactor-http-epoll-4"
"message":"creating some value","logger_name":"domain.Service2", "thread_name":"nioEventLoopGroup-3-2"

any help would be appreciated! Regards, Gena

8 replies
Hi , I had a doubt regarding how Spring WebClient works in a typical spring MVC application. I learnt that spring cloud gateway cannot run on mvc as it relies on spring webflux and that we should not mix runtime models like netty and tomcat . So my question was how can webclient run efficiently in spring mvc application ?
12 replies