Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
@bclozel thank you!!
Oemer Genc

Hi, currently i need some help with GroupedFlux. What i want is to group messages by id and then process a specific number of messages for each id, let's say 3 messages for each id.
I have the following code:

                .groupBy(m -> m.key)
                .concatMap { it.collectList() }
                .flatMap { Flux.fromIterable(it.take(3)) }
                .subscribe { println(it.key) }

This works, however if the grouped fluxes are containing a lot of messages, the collectList() takes very long.
As i am only interested in 3 messages of each group this seems to be unnecessary but i cannot find a proper way to prevent this.
Something like ....concatMap { it.take(3).collectlist() } always results in the following error:

Although QueueSubscription extends Queue it is purely internal and only guarantees support for poll/clear/size/isEmpty. Instances shouldn't be used/exposed as Queue outside of Reactor operators.

Anyone an idea how this can be done?

Leonard Brünings

Hi, I have this code to calculate a regularly updating status as hot producer.

                Flux.interval(Duration.ZERO, properties.getUpdateInterval())
                        .onBackpressureDrop(intervalCount -> {
                            log.warn("Too many connections pending, dropped status update request {}", intervalCount);
                        .concatMap(ignore -> updateStatus()))

what is the correct way to stop this? takeWhile(ignore -> running) or .takeUntilOther(killSwitch) with private DirectProcessor<String> killSwitch = DirectProcessor.create();

                Flux.interval(Duration.ZERO, properties.getUpdateInterval())
                        .onBackpressureDrop(intervalCount -> {
                            log.warn("Too many connections pending, dropped status update request {}", intervalCount);
                        .concatMap(ignore -> updateStatus()))
                .takeWhile(ignore -> running)

Or something else?

2 replies
Jørgen Ringen
Using reactor-kafka, is there any way to continue on deserialization errors (kind of like log-and-continue-exception-handler in kafka-streams)?
Currently getting r.k.r.i.DefaultKafkaReceiver$PollEvent: Unexpected exception and it makes the whole "pipeline" fail.
1 reply
Alwyn Schoeman

Hi, I'm looking for the correct pattern to do the following:

I need to do 3 or more operations against an external system. The inputs and outputs of each operation is different from one another AND subsequent operations depend on outputs of previous operations

E.g. A -> B -> C -> D

Each following step checks a response code of the previous step.
If not successful it should not continue with the rest of the processing, but return the response of the previous operation.
If successful, it should use data from the previous response and perform the next action.

I am trying to avoid the scenario where I have the following nesting (pseudocode):

doA.flatMap { r ->
           if (r.code != 0) return@flatMap Mono.just(r)
           doB.flatMap { r ->
               if (r.code != 0) return@flatMap Mono.just(r)
               doC.flatMap {r ->
                   if (r.code != 0) return @flatMap Mono.just(r)
etc, etc, etc, etc
1 reply
Alex Mrynsky
looking for a "reactive" way to work with piped input/output streams to compress data using GZIPOutputStream. Any hint is appreciated
Tomas Krasny
Hi, can someone explain, why the following code doesn't block when sending 11th item? According to the doc the EmitterProcessor (if it has no subscribers) buffers the incoming items up to a bufferSize (which I set to 10). The following onNext should block (until it's drained) but it doesn't. Instead it blocks on inserting 17th element. Why it successfully inserts 16 elements? Why 16? v3.3.10.RELEASE.
public static void main(String[] args) {
    EmitterProcessor<Integer> processor = EmitterProcessor.create(10);

    for (int i = 1; i <= 20; i++) {
        System.out.println("sent: " + i);

Giorgi Andriadze
Hi, I want to understand how big of a performance impact Mono.delay() has. I may have to create around 10k Mono-s with random delay of couple of secconds. Is that safe, or will that create a lot of threads? Sorry if it's a silly question I'm new to this.
2 replies
Sagar Raj
Hello, I am building a reactive pipeline, I read from a sqs queue and process it within the main pipeline. The data is grouped and the group flux is subscribed. In my onNext, I am receiving a grouped flux. I process the content of the flux and pass it on to another consumer. As the subscription here is happening within Onnext, I am guessing I should be creating new instance of the consumer that auto-wiring right? Looking for some guidance on this :)
Hi Team - I have a use case where I need to Run WebClient on Parallel Calls for 10 Upstream systems and timeout is 450ms, few of the Upstream System gives result in 80-150ms as p99 latency and few takes around ~300ms. Here I need to collect data from each and then do the ranking. I need a suggestion here If I use Collect() and then go for Ranking will that be blocking the thread by any chance as I need to get the data from all first and then go for Ranking. Can someone explain this thread flow here, thanks in advance.
Chakradhar Kasturi

Hi, I want help in understanding wether we can maintain trace/span id's in the entire pipeline.

For e.g.,

            .map(i -> i * 2)
            .filter(i -> i > 3)

I want to send signal all the way to the down stream kind of unique-ids (span/trace ids) for each element emitted.I am guessing it becomes tricky in case of flatMaps or schedulers?

I tried using Hooks and MutableContext?
I don't want to change the existing pipeline thats why I thought Hooks are best for my problem.Any idea is highly appreciated..

1 reply
Sagar Raj
Hi, I am looking recommendation on (groupBy & subscribe) vs (flux multicast and filter) to take different action on different event types on the flux. Could someone please help @simonbasle

I need to take an object, send it to a server (using a WebClient), log the response, then do some work with it


Problem is I don't see the server response in the logs
I assume I should not be calling WebClient.exchange() inside a .doOnNext()

What would be the correct way to do this?

1 reply
Sagar Raj

Hi, I am looking recommendation on (groupBy & subscribe) vs (flux multicast and filter) to take different action on different event types on the flux. Could someone please help?

I am starting with reactive development and I believe the approach is share the flux and filter them in each of the pipelines. I understand that the thread count would be lower, but would this not increase the CPU as computations are lot higher. (I would use a switch case in the regular programming model)

Sagar Raj

The incoming messages are of the type

 "messageType": "lightconfig",
  "state": "on"/"off",
  "deviceId": "A0001"

I guess there would be like 15-20 categories eventually. So the common part is retrieving the device details. After that, I see 2 options,
Do the common part of the pipeline and share the observable. Then each subscriber can listen to this. Say there are 20 observers and 100 events. We would be running the filter computation 2000 times.
Do the common part of the pipeline, use group-by to group the observable and subscribe the grouped observable with an observer. the observer will receive the Map<messageType,Observable<Message>>.

5 replies
Amardeep Singh Khera

Hi, I am using reactor.retry.Retry from io.projectreactor.addons:reactor-extra:3.3.4.RELEASE lib to include retries in my reactive chain.Everthing works as expected, but when I run againist blockhound I get a blocking exception as follows.Is there any recommended way to fix this, or is it a bug in the library itself.

at java.io.FileInputStream.readBytes(FileInputStream.java)
at java.io.FileInputStream.read(FileInputStream.java:255)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at java.io.DataInputStream.readFully(DataInputStream.java:195)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at java.time.zone.TzdbZoneRulesProvider.load(TzdbZoneRulesProvider.java:187)
at java.time.zone.TzdbZoneRulesProvider.<init>(TzdbZoneRulesProvider.java:113)
at java.time.zone.ZoneRulesProvider$1.run(ZoneRulesProvider.java:157)
at java.security.AccessController.doPrivileged(Native Method)
at java.time.zone.ZoneRulesProvider.<clinit>(ZoneRulesProvider.java:144)
at java.time.ZoneRegion.ofId(ZoneRegion.java:120)
at java.time.ZoneId.of(ZoneId.java:411)
at java.time.ZoneId.of(ZoneId.java:359)
at java.time.ZoneId.of(ZoneId.java:315)
at java.util.TimeZone.toZoneId(TimeZone.java:556)
at java.time.ZoneId.systemDefault(ZoneId.java:274)
at reactor.scheduler.clock.SchedulerClock.of(SchedulerClock.java:166)
at reactor.retry.AbstractRetry.<init>(AbstractRetry.java:66)
at reactor.retry.DefaultRetry.<init>(DefaultRetry.java:48)
at reactor.retry.DefaultRetry.create(DefaultRetry.java:58)
at reactor.retry.Retry.allBut(Retry.java:107)


Hi, i'm using the spring framework org.springframework.web.reactive.socket.WebSocketSession, and i create a UnicastProcessor reactor.core.publisher.UnicastProcessor to send messages. LIke this:

publisher = UnicastProcessor.create();
 // [...] stuff
 WebSocketMessage message = new WebSocketMessage(WebSocketMessage.Type.BINARY, dataBuffer);

It is working perfectly, however i'm curious what happens, if i produce too many messages, and my network can't keep up with the sent messages.

  • How do i detect such a thing?
    • What happens to my application by default?
    • What can i do if that happens? Should implement some kind of backpressure?

I tried to search on google, but no luck, so any help is appreciated


for reactivegridfs: It seems to run out of memory. I am not able to apply backpressure...any hints pls
for (Map<String, Object> metadata : mdList){saveObject(dataBuffer, metadata)
.onErrorContinue((e, a)-> log.error(e.getMessage()))
.subscribe(new BackpressureReadySubscriber());}

public class BackpressureReadySubscriber<T> extends BaseSubscriber<T> {

int limit =100;
int factor = limit;
int delay = 5000;
int consumed;

protected void hookOnError(Throwable throwable) {

public void hookOnSubscribe(Subscription subscription) {
   // log.info("Size of request"+ factor);
    // initial request, but why it is called everytime
    //further request data

public void hookOnNext(T value) {
    if (consumed == limit) {
        consumed = 0;
        log.info("Sleeping for "+delay/1000+" sec");
        log.info("woke up after "+delay/1000+" sec");


Erik Lumme
I saw a discussion from a year or two back about a way to set some default Context values for all publisher chains. This is what I came up with, not sure if I'm terribly ruining everything by doing so:
Hooks.onLastOperator("test-hook", publisher -> {
    if (publisher instanceof Mono) {
        return ((Mono<Object>) publisher).subscriberContext(SomeClass::initContext);
    return ((Flux<Object>) publisher).subscriberContext(SomeClass::initContext);

Hello! I am using Kafka reactor. I consume batch of messages from Kafka and store it in DB asynchronously and the operation returns completableFutures.
This is what I have so far:

Flux<K> flux = receiver.receiveAtmostOnce()
            .bufferTimeout(maxBatchSize, batchTimeout)
            .doOnError(th -> {
                LOG.warn("Exception while handling records", th);
            .doOnCancel(() -> LOG.info("Kafka receiver stopped"))
            .subcribe(records ->
                                completetableFutureArray = processRecords(records)  
                               //Blocking call that I want to avoid//

I understand using a blocking call is bad but

  1. If I don't block, how will back pressure work? Won't the subscriber continuously fetch more items before finishing processing of the existing items?
  2. Mine is a slow consumer fast producer problem and that is why back pressure is important.

Any help/suggestion would be appreciated

I have created a chain of fluxA.transform(performTaskA).transform(performTaskB).transform(persistData).transform(performTaskC) is there a way to preserve context of output of performTaskA and make it available in performTaskC ?
1 reply

need help to read the field value from ServiceRequest object and this object hold only "Part.class" file content.

when i iterate the "partFile" object, it doesn't hold the Form-fields values.

public Mono<ServerResponse> getUploadFiles(ServerRequest request){

        return request.multipartData()
                .flatMap(map -> Flux.fromIterable(map.values()))
                .flatMap((partFile) -> ServerResponse.ok()
                                                     .body(Mono.just(fileStorageService.storeZipFile(partFile.get(0), "", "")), String.class))
                        (e) -> ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
                                             .body(Mono.just(e.getMessage()), String.class));

along with files we are sending from fields value, example : name: XYZ, date: 2020-10-13
how do i read those form fields, Could you please anyone help me on this
Prakhar Tandon
need help to convert a Mono<PojoType> to simply obtain PojoType object without using block() in http-reactor-epol thread
How does backpressure works? Is Thread.sleep() needed to slow down the consumer?
Marcus Rodan

Hello, I'm trying to understand how gRPC Client Cancellation works when using https://github.com/salesforce/reactive-grpc. I have an example program that creates a simple service, something like the following:

public class Service extends ReactorServiceGrpc.ServiceImplBase {
  public Mono<User> getUser(Mono<GetUserRequest> requestMono) {
    return requestMono
            .doOnNext(__-> log.info("Got a request"))
            .doOnTerminate(() -> log.info("Terminated"));

I expect that I will see get "Terminated" shortly after "Got a request!" if I cancel the call from the client. I, however, see the following output:

2020-10-25 16:17:09.633 [ault-executor-0] : Got request!
2020-10-25 16:17:14.634 [     parallel-2] : Terminated
2020-10-25 16:17:14.637 [     parallel-2] r.c.p.Operators : Operator called default onErrorDropped

io.grpc.StatusRuntimeException: CANCELLED: call already cancelled

Any ideas on why this happens and why I don't get the expected behavior?

Sagar Raj
What is the right way to handle backpressure in async subscribers?
Eugene Kamenev

Hi! Dear reactor team, I cannot understand why this test case does not pass?

        //def scheduler = VirtualTimeScheduler.getOrSet();
        def rp = Sinks.many().replay().limit(Duration.ofSeconds(1))
        def flux = rp.asFlux().doOnNext({
            log.info it.toString()

        for (int i = 0; i < 5; i++) {
            rp.emitNext(i, Sinks.EmitFailureHandler.FAIL_FAST);
        // scheduler.advanceTimeBy(Duration.ofSeconds(2))
        for (int i = 5; i < 10; i++) {
            rp.emitNext(i, Sinks.EmitFailureHandler.FAIL_FAST);


This test case passes only if I uncomment virual scheduler.

HI why not having ServerResponse<Flux> in fuctional controllers?
Hi team, I'm looking for something that works a bit like Flux.sample(Duration), but which emits an element immediately if no previous element was received in the preceding sample window, and then resets the sample window from that point. As far as I can see, the Flux.sample(Duration) operator will always wait until the current sample window expires before emitting, even if nothing was emitted in the previous window. Does something like this already exist? I suspect it might be possible to implement what I need with the Flux.sample(Publisher) method but I'm not sure how to approach it, so any pointers here would be much appreciated. Thanks!
Omid Barat
How can I make Map<String, Mono<String> to Mono<Map<String, String>>?
1 reply
FluxElapsed operator has a single field as its state (lastTime) and it's not volatile. It's written to in onSubscribe and onNext methods which are guaranteed to be invoked sequentially, but not necessarily from the same thread. Where exactly is the memory barrier which ensures correctness of reads and writes to this field?

I've discovered something strange

Server side:

        GET("/flux") {
            val flux = Flux.range(0, 5)
//                    .map(Int::toString)

Client side:

                .doOnNext { logger.info(it.statusCode().toString()) }
                .doOnNext { it.headers().asHttpHeaders().forEach { logger.info("${it.key}: ${it.value}") } }
                .flatMapMany { it.bodyToFlux<String>() }

If the server returns a flux of integers,
then response content-type is set to application/json,
client waits for 5 seconds, then receives the response and body together
body is an array of integers

2020-11-01 23:44:00  - GET /flux
2020-11-01 23:44:06  - 200 OK
2020-11-01 23:44:06  - transfer-encoding: [chunked]
2020-11-01 23:44:06  - Content-Type: [application/json]
2020-11-01 23:44:06  - [0,1,2,3,4]

If the server returns a flux of strings,
then response content-type is set to text/plain,
client receives response immediately, then waits for 5 seconds, and receives the response body
body is a concatenated string

2020-11-01 23:45:07  - GET /flux
2020-11-01 23:45:08  - 200 OK
2020-11-01 23:45:08  - transfer-encoding: [chunked]
2020-11-01 23:45:08  - Content-Type: [text/plain;charset=UTF-8]
2020-11-01 23:45:12  - 01234
Why this difference in behavior?
if the server returns a flux of strings, if I manually set content-type to application/json, it makes no difference as the client still receives response first, the body 5 seconds after, as a concatenated string
what do i need to do to make it return an pretty array of strings ?
1 reply
and what's the principal difference between returning a Flux<String> and a Mono<List<String>>?
(provided I don't intend to use content-type application/stream+json or text/event-stream)
2 replies

Hi there, i'm trying to implement a customQueue for a UnicastProcessor inside a Spring application, to handle bursts (i just want to drop the incoming message, if there is too many). My problem is, if i do

UnicastProcessor<WebSocketMessage> publisher = UnicastProcessor.create(myCustomqueue);

then my application totaly freezes overtime, and NOTHING works. there are no more logs, or anything like that. However if i create a UnicastProcessor with 0 parameter in the constructor, then everything is working fine.

Here's my custom queue implementation, it's really simple:

public class LimitedQueue<E> extends ArrayDeque<E> {

    private final int limit;

    public LimitedQueue(int limit) {

        this.limit = limit;


    public boolean offer(E o) {
        if (size() >= limit) {
            return false;
        return super.offer(o);


I found a really similiar question from 8 months ago here as well:

Could anyone help me out?

1 reply
Fırat Sivrikaya
Hey everyone, I have just started to learn spring reactor, been reading the reference document but I am having hard time understanding the concepts and finding the reference a little bit complex for a beginner like me. Also I couldnt find real world examples on the internet, can you please suggest me some resources for better understanding reactor and webflux? All examples that I see on the internet are mostly contains creating a flux and then subscribing to it. Maybe someone can share open source real world projects with tests and I could dive into them. I am losing a lot of motivation here :)
Violeta Georgieva
@firatsivrikaya What exactly do you need. There are a lot of resource in the web. Josh Long has a lot of videos that you can check. For example https://youtu.be/_LR0Cxnn-kw?t=1649
Mikael Elm
Also, if you haven't had that approach already - read the reference patiently. Don't skip parts or rush through it. IMO then it is a great resource. But I agree, there are many good videos out there from conference talks where introductions to webflux/reactor are covered

Hi everyone, I'm trying reactor Kafka (I'm basicaly a newbie at it), and made the samples work. However the consumer keeps spamming the following logs:

2020-11-04 10:01:36.007+0100 DEBUG 3856 --- [-sample-group-1] o.a.k.c.FetchSessionHandler              : [Consumer clientId=sample-consumer, groupId=sample-group] Node 0 sent an incremental fetch response for session 569816377 with 0 response partition(s), 1 implied partition(s)
2020-11-04 10:01:36.008+0100 DEBUG 3856 --- [-sample-group-1] o.a.k.c.c.i.Fetcher                      : [Consumer clientId=sample-consumer, groupId=sample-group] Added READ_UNCOMMITTED fetch request for partition test2-0 at position FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=abbelfynsrv:9092 (id: 0 rack: null), epoch=0}} to node abbelfynsrv:9092 (id: 0 rack: null)
2020-11-04 10:01:36.008+0100 DEBUG 3856 --- [-sample-group-1] o.a.k.c.FetchSessionHandler              : [Consumer clientId=sample-consumer, groupId=sample-group] Built incremental fetch (sessionId=569816377, epoch=35) for node 0. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
2020-11-04 10:01:36.008+0100 DEBUG 3856 --- [-sample-group-1] o.a.k.c.c.i.Fetcher                      : [Consumer clientId=sample-consumer, groupId=sample-group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(test2-0)) to broker abbelfynsrv:9092 (id: 0 rack: null)

Is there a way to disable this? I can show my consumer's code if needed.

Jens Geiregat

Hi all,

I'm developing a Spring-Boot Reactor based application that we're currently performance testing. The basic data flow is:

  1. Fetch a lot of data from the DB using r2dbc-postgresql
  2. Do some heavy calculations on that data
  3. Return an aggregation of that data

Now, we're having issues. When the system is under load, our DB reports responding withing < 200ms is most cases, while a lot of our trace data shows that we only receive the first result from the DB in the code after ~1s. And I have no clue where that time difference is coming from.

Some things I have tried:

  • moving the cpu heavy calculations to the Parallel scheduler
  • even added artificial buffering to the stream to be able to measure throughput, or avoid backpressure problems, but still, the DB code seems extremely slow.

The only thing left for me to try is to throw out the r2dbc-postgresql repository and replace it with plain old jdbc and just schedule that on a boundedElastic scheduler?

Any idea is welcome...

Hi Guys,
I am developing application using Spring webflux and cassandra as database. I have use case where I want to sort data based on particular field, fetch top 10 records and display on UI. Due to cassandra limitations I cannot do directly through database query.
I have tried to fetch all data from table and sort data on Java layer and then return top 10 records to UI. Though I have millions of records in table, fetching all data and do sorting on Java layer takes significant amount of time.
Is there any better approach to achieve the same on Java layer?
8 replies
Knut Schleßelmann

I've got a library which parses a lot of XML data and where I have to provide some callback what should be done with the parsed data. Now I try to incorporate this lib in my reactive application like this

private fun InputStream.parse(): Flux<ParsedProduct> = Flux.create { sink ->

    val requested = AtomicLong(0)

    sink.onRequest {
        if (logger.isTraceEnabled) {
            logger.trace("Requested ParsedProducts", kv("number", it))


    val parser = XmlArchiveParser(ProductParser()) {
        if (logger.isTraceEnabled) {
            logger.trace("Parsed product", kv("productId", it.productId))

        while (requested.get() <= 0) {
            logger.trace { "Waiting for requests" }



Is this a proper way to implement this hybrid push/pull backpressure mechanism? If so which Thread gets blocked currently by the Thread.sleep? Should I subscribe this stuff on boundedElastic()?

I want to have two router endpoints, one POST that receives some data and sends it to a "channel", and the other GET that sends out everything from that channel as Server-Sent Events
I've seen such channels in Go
Is there something similar in Reactor?
3 replies

I'm using Spring WebClient (based afaik on reactor-netty-httpclient?)
I'm sending a stream or 5 values with a one second delay (Flux<Object>, content-type = application/x-ndjson) to the server, and receiving the response as a Flux as well
I expect to see it the logs something like this:

object #1 send
response #1 received
object #2 send
response #2 received

Objects are sent one-by-one, and responses are also received one-by-one
but WebClient starts receiving the response only after it finishes to send the request
I've found a similar question on SO: https://stackoverflow.com/questions/52113235
In it, @bclozel says that Spring WebClient and Reactor Netty HttpClient only start processing the response after they're finished processing the request
Also, @violetagg told me that if the Netty server sends a response before it it finished processing the request body, then the remaining request body is discarded (?), as per the HTTP spec (?)
Could you please confirm that this is the way WebFlux and Reactor Netty work by design?

Interestingly, another answer on SO says that Jetty-based WebClient works differently, it is able to start receiving the response before finishing to process the request
Is the restriction on the client side, or server side, or both?

Violeta Georgieva


Also, @violetagg told me that if the Netty server sends a response before it it finished processing the request body, then the remaining request body is discarded (?), as per the HTTP spec (?)

And this is true if you finalise the response and not if you start writing the response