Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
Devbrat Dash
and returns 200 to the client
what am i doing wrong
because if i debug the flow at the onErrorResume(error -> ..) the error variable has the error can't understand why it throws the error
below is the complete error
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class org.springframework.web.reactive.function.server.DefaultEntityResponseBuilder$DefaultEntityResponse and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS)
Knut Schleßelmann
Hi! I need to process a (possibly really huge flux) and make some side effect if a characterisitc of the elements change. I think windowUntilChanged(keyExtractor) is possibly the right operator for this but how can I access the current key of a sub-flux if it completes? I think I'm looking for something like windowUntilChanged that returns a GroupedFlux so I have an Idea of the current key. Any suggestions on this? Currently I'm trying a nestedwindowUntilChanged { groupBy { g -> g.doStuff.doOnComplete { print(g.key()) } } } but this feels wrong :-/
Daanish Sarguru
Hi everyone, I was referring to this issue to better understand ReadTimeout vs ResponseTimeout
Can anyone explain this point?
They apply even when an HTTP request is not being processed. For example, they could cause a connection sitting in the connection pool to be closed, even though it might be able to be used a split-second later by another request.
1 reply
khizar khan

please review my code i have this code on server side

    public Flux<DataBuffer> stream(Mono<String> nameMono) throws IOException {
        if (resource.exists()) System.out.println("true");

        Flux<DataBuffer> dataBuffer = DataBufferUtils.read(resource, new DefaultDataBufferFactory(), 1048576)
                .doOnNext(s -> System.out.println("Sent"));
        return dataBuffer;

the above code is sending a file as Flux<DataBuffer>. while on the client side.

                .doOnNext(dataBuffer  -> {
                    try {
                        service.uploadFile(path, dataBuffer);
                    } catch (IOException e) {
                .doOnComplete(() -> System.out.println("Upload Complete"))

the above code is routing to server and requesting stream which sends a file in bytes but the thing is the file created on client side shows 0 bytes. please help

R. Kyle Murphy
Hello, hopefully this is an easy one for someone to answer, but I feel like I'm going insane here. I've got a reactive chain that basically looks like someMethod().onErrorResume(e -> Mono.empty()).subscribeOn(scheduler).subscribe(). The thing driving me crazy is that when I breakpoint inside of the onErrorResume it's executing in a thread not owned by the scheduler, which is counter to everything I thought I knew about reactor. Can someone explain what I'm misunderstanding about how this works?
1 reply
javax.net.ssl.SSLException: failure when writing TLS control frames
1 reply
khizar khan
hey can i creat a bi-directional channel between two clients through server.
Violeta Georgieva
Hi All,
We are discussing Reactor Next Generation reactor/reactor#703
We would like to hear from you what you would like to see in it :)
Andrey Filyanin
Hi, we use Spring WebFlux to be reactive, and r2dbc to stay reactive all the way. The only bad thing is we use GraphQL which supports only Future, so we do toFuture call on Mono or Flux before give back a result. We run into performance issue - our application runs inside k8 and I see that the amount of cores is 1, so when we run long-running query it impacts a health check and K8 kills a pod. Is there a way to configure WebFlux forcibly use more threads, and could it helps anyway? Thank you.
khizar khan
what is this error
reactor.core.Exceptions$ErrorCallbackNotImplemented: ApplicationErrorException (0x201): Destination 'channel' does not support REQUEST_STREAM. Supported interaction(s): [REQUEST_CHANNEL]

Getting the below error couple of times while uploading same multipart file.
Caused by: java.io.IOException: Out of size: 990 > 989 at io.netty.handler.codec.http.multipart.AbstractMemoryHttpData.addContent(AbstractMemoryHttpData.java:104)
io.netty.handler.codec.http.multipart.HttpPostRequestDecoder$ErrorDataDecoderException: java.io.IOException: Out of size: 990 > 989

Any guidance will be very much appreciated.

3 replies
Hi! Quick q about interacting with non-reactive code. I'd like to be able to 'pause' upstream demand (/request) based on something outside of the reactive flow. So for e.g. lets pretend theres some old school 'listener' which can give a ready / not ready signal. When this goes not ready - I'd like to stop further upstream 'request', and 'switch it back on' when it again signals ready. I imagine this to be very similar to 'limitRate' - where an infinite request is rate limited (with a given pre-fetch etc) - BUT - with the additional ability to pause emission of / resume subsequent 'requests' based on external signal (as described above). I'm trying to work out the most idiomatic way to implement this. It seems I could do it with a Subscriber implementation when I ultimately subscribe to the Flux - but I wonder whether there is a better / more idiomatic way to do this where I can 'just plug something in' (a bit like being able to do 'limitRate'. Any pointers?
1 reply
How can I update the Context in a stream using WebFilter, and also How can I access the same Context in cross-cutting aspects.
I want to get host details using webFilter and then put those information in ReactiveTransactionManager.
Rob Gillen
Hi all! Can anyone tell me if Reactor provides a way to communicate from the subscriber to upstream publishers? I'm not exactly sure if that makes sense. E.g. if a subscriber does something with a value passed down to it and afterward I wanted to communicate some state about that value back to the publisher (or some other intermediary), how would I do this? The reactive Context doesn't appear to be what would be used for this. My initial thought is to wrap or pair the objects being published with something that can perform the operation from the subscriber. This seems like a messy solution, but also workable. Is there some other solution that I might be missing?
Mantas Matūzas
Hello, can somebody explain whether this code block is blocking or not? If yes - how should I integrate ObjectMapper to call chain correctly?
    return Mono.just(apiErrorTranslator.translate(throwable))
            (errorResponse, sink) -> {
              log.error("Error occurred while processing request", throwable);

              var httpResponse = exchange.getResponse();


              try {
                var bytes = objectMapper.writeValueAsBytes(errorResponse);
                var buffer = httpResponse.bufferFactory().wrap(bytes);
              } catch (JsonProcessingException e) {
8 replies
Does anyone have suggestions regarding job schedular like quartz for Reactive universe? I have use quartz in Spring MVC but in my current web flux setup, I use Reactive Cassandra. I need a distributed job scheduler for Cron jobs
Jonathan Gray
Hi, I'm using reactor-rabbitmq RabbitFlux Receiver and consumeManualAck with default ConsumeOptions. We have auto recovery enabled on the RabbitMQ connection but have faced a network connection issue which appears to have been auto-recovered but no further consumption occurred. I see the default ConsumeOptions exception handler retries for 10 seconds, what happens when that time passes? Is it advisable to set that time to a very large value if I know that the auto-recovery is there?
Hey Everyone! I've asked this question on rsocket gitter already, but they said that it's rather related to reactor so now i'm here.
    public Flux<Payload> requestStream(Payload payload) {
        //first flux
        return Flux.interval(Duration.ofMillis(1000))
                .map(time -> {
                    return DefaultPayload.create("Hello " + payload.getDataUtf8() + " @ " + Instant.now());

        //second flux
        return Flux.create(fluxSink -> {
            for (int i = 0; i < 10; i++) {
                try {
                    fluxSink.next(DefaultPayload.create(String.format("Number %s", i)));
                } catch (InterruptedException e) {
how the first flux creates 1 element and sends it to the client right away while the second one generates all 10 elements and only then sends it to the client? also how would i go about doing something similar to the first flux? please help
I'm trying to understand the Context stuff and wondering how/if it works deep in library code. I guess I might be asking if a threadlocal is set for the time when something is running.
Manish Malik
I am moving from Akka actor reactive programming to webflux. Any documentation or any reference which can help. Thanks in advance.
Zane XiaoYe


Mono<Address> address = ..;
Mono<Person> person = ..;
Mono.zip(address, person).flatMap(t -> {...});

both address and person could be empty, and if it’s empty, I need to log an warning. where should I put the log.warn()?

3 replies
Zane XiaoYe
Hi, how can I build a pdf or epub from the https://github.com/reactor/reactor-core/tree/main/docs/asciidoc, is there any existing commands?
12 replies
Zane XiaoYe
Brigen Tafilica
Hello everyone.
I am trying to read all accounts from a mongo collection (a LOT like 1Million and this is mandatory)
than i need to do some I/O bound operations on each account and than Update them in the mongoCollection again.
Can someone help me with the best approach (maybe some demo code impl) to do this? Im struggling with the performance.
Also we might want to scale verically, so go from 32Cpu cores to 64. I assume since we have I/O bound proccessing scaling vertically will not help much or am i wrong? Is there also some other impl to take also this scaling in consideration? Thanks a lot in advance
Pieter Martin

I am struggling to understand the parallel behavior.

    public void test() throws InterruptedException {
        Flux.range(1, 1_000)
                .subscribe((a) -> {
                    System.out.println(Thread.currentThread().getName() + " " + a);


parallel-1 1
parallel-1 2
parallel-1 3
parallel-1 998
parallel-1 999
parallel-1 1000

I was expecting the subscribe function to execute in many threads depending on my CPU.
Instead they all execute on parallel-1
Doing some wrong I suspect?

1 reply
Ketan Vishwakarma
What should be the reactive equivalent of SecurityContextHolder.getContext().getAuthentication();

Hi, what is the recommended concurrency in the case of flatMap? I bumped to this question which says the default value is 256

My question is this on which scenarios we need to explicitly handle concurrency in flatMap

13 replies

Hi, what is the recommended concurrency in the case of flatMap? I bumped to this question which says the default value is 256

My question is this on which scenarios we need to explicitly handle concurrency in flatMap

@OlegDokuka any suggestions?

Mico Piira
Is it possible to use BlockHound inside real Tomcat? I just get a bunch of IllegalAccessErrors when trying to initialize BlockHound in a web application running inside Tomcat.
Zane XiaoYe
Shoud I use Mono for non-async code?
Sometimes I just want to use it as a Optional for easier composition. Should I use Mono as Optional?
Burim Krasniqi
senderOptions = senderOptions
    .producerProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "SampleTxn");       
           .sendTransactionally(source.map(r -> Flux.fromIterable(transform(r)))) 
           .concatMap(r -> r)
           .doOnError(e-> log.error("Send failed, terminating.", e))
           .doOnNext(r -> log.debug("Send completed {}", r.correlationMetadata());
Kafka Producer can not be reused in case when I use it with transaction. It complains that the same transactionId can not used in different calls on the same time(throws exception only in case of parallel calls). I can solve it by creating different producers with different transactionId but this is not that efficient as it consumes more threads internally. Is there any alternative to use same Kafka Producer - (KafkaSender.create(senderOptions) with different transactionIds without having to create and close it.
1 reply
Hi - quick question. Let’s say I’ve got a flux which will produce millions of entries. Each entry has some id: if we have not seen this Id yet in the flow - we need to do some extra work (e.g retrieve something). Otherwise we just get on with processing the entry. There are a large number of ‘ids’ - so group by doesn’t seem right. What is the reactor way to deal with this? I could use some state - like a set - to figure this out / record, but just wondering if there is a more idiomatic approach
Brigen Tafilica

Bumping my question
Hi its me again, Sorry to bother everyone

I am trying to read all accounts from a mongo collection (a LOT like 1Million and this is mandatory)
than i need to do some I/O bound operations on each account (sum all values inside a big array, some other arithmetic operations like * multiplication and Math.pow) and than Update them in the mongoCollection again.
Can someone help me with the best approach (maybe some demo code impl) to do this? Im struggling with the performance.
Also we might want to scale verically, so go from 32Cpu cores to 64. I assume since we have I/O bound proccessing scaling vertically will not help much or am i wrong? Is there also some other impl to take also this scaling in consideration? Thanks a lot in advance

josh gruenberg

hi there. I'd appreciate some advice: I have a flux of timestamped events (from kafka), and I need to collate them into event-time windows, with a configured accumulation timeout. the events aren't strictly ordered, but I can tolerate windows closing prematurely: stragglers that arrive after their previously-open window has closed can just be allocated into a new, separate window for the same time period.

I seek an appropriate strategy for achieving this without leaking resources or degrading performance as the cardinality of event-time windows grows without bound.

at first glance, something like the following seems like it might work:

events.groupBy(event -> event.timestamp().truncatedTo(windowDuration))
        .flatMap(group -> group.window(windowAccumulationTimeout))

however, I'm concerned about the implications of groupBy for that unbounded stream of groups. I suspect this might be problematic because each group will remain in the the groupBy state forever, even after the window-timeout expires for the last straggling event in each window.

am I wrong to be concerned about this? is there a better way to do what I need?

7 replies
Salathiel Genèse

Hey guys

I trust you all are thriving.

For reusability concerns, I run my validation on the service layer, which returns Mono.error( constraintViolationException )...

So that my web handlers merely forward the unmarshalled domain to the service layer.

So far, so great.

But how do I advise (AOP) my web handlers so that it returns HTTP 422 with the formatted constraint violations ?

WebExchangeBindException only handle exceptions thrown synchronously.

My AOP advice trigger and error b/c :

  • my web handler return Mono<DataType>
  • but my advice return a ResponseEntity

And if I wrap my response entity (from the advice) into a Mono<ResponseEntity>, I an HTTP 200 OK with the response entity serialized :(

Salathiel Genèse
The link to the SO question, with code excerpt - https://stackoverflow.com/q/69519724/3748178
Zane XiaoYe
Hi, I’m working on rewriting a periodically pulling job (using @Scheduled annotation) into reactive stream. The system pulls messages from an message queue(not Kafka). It is thread safe and the previous implementation is using multiple clients/threads to do the pulling. After the messages are pulled, we need to do some processing(enrichment, sending to another queue) then ack back to the message queue.
What is the equivalent operator/API to do the same work using Reactor?
Daniil Mikhaylov

Hi there!
I have a question. Why it's recommended to run blocking code out of Mono.defer() or mono.flatMap() in Mono.fromRunnable() or Mono.fromCallable()?
I mean, if I look at sources of Mono.fromRunnable(), actually it does like the same as Mono.defer():

    public void subscribe(CoreSubscriber<? super T> actual) {
        MonoRunnableEagerSubscription s = new MonoRunnableEagerSubscription();
        if (s.isCancelled()) {
        try {
        } catch (Throwable ex) {
            actual.onError(Operators.onOperatorError(ex, actual.currentContext()));

The blocking code runs in subscribe() method. Supplier in Mono.defer() does the same?
Does it matter if I switch thread through .subscribeOn() operator?
Also MonoSubscribeOn cause me for thought. It does worker.schedule(() -> s.request(n)); on parent, when subscribe happens. But I can't find what happens, why request() scheduled on worker, not subscribe()?


Hi all,

I have function that deletes user both from database and keyclaock. Sometimes it happen that user gets deleted from database but remains in Keyclaock. Dose anyone have an idea what could be a problem? Only happens from time to time but do not know why.

        return _companyRepository.findById(companyId))
            .flatMap(companyDocument -> {
                    final List<Employees> employees = companyDocument.getEmployees();
                    if (employees != null && !employees.isEmpty())
                        final boolean isRemoved =
                            employees.removeIf(employees -> employees.getId().equals(employeeId));
                        if (isRemoved)
                            return _companyRepository.save(companyDocument).thenReturn(employeeId);
                return Mono.just(employeeId);
            .flatMap(employeeId -> _keycloakService.deleteUser(employeeRealm, employeeId))
            .onErrorResume(e -> Mono.empty());
Salathiel Genèse
And a StackOverflow +50 bounty

Hi, I am using SSE in the Spring web flux application. I have created an SSE endpoint that gets data from Redis pub-sub and Redis-pub gets data from an External WebSocket connection.

My problem is that when there is no data from Websocket then as soon as I try to connect with my SSE endpoint the connection terminates due to the unavailability of data. My hunch is that I need to pass comments heartbeat if data is not coming. how can I do that?

private fun realtimeSinkInitializer() {

         * onBackpressureBuffer by default cancel the subscription when last client closes, we made auto cancel false
        realtimeSink = Sinks.many().multicast().onBackpressureBuffer(1000, false)

            .map(ReactiveSubscription.Message<String, String>::getMessage)
            .map { msg ->
                { weatherData: List<WeatherDto> ->


                { err ->
                    logger.error(err) { "Error occurred redis subscription channel" }

 fun getRealtimeWeather(): Flux<ServerSentEvent<Map<String, WeatherDto>>> {
        return weatherSink.asFlux()
            .map { e -> ServerSentEvent.builder(e).build() }.onErrorResume { e ->
                logger.error(e) { "Error getting realtime weather data" }
2 replies
Mantas Matūzas
Hello, I'm trying to integrate reactive and non-reactive code. The use case is to have non-reactive event listener and call reactive service. Do you think is it fine to do something like this?
  public void handle(TheEvent event) {
Salathiel Genèse


The subscribe() method return a Disposable... If you do not keep a reference to that object, when do you call its dispose object ?