Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
Burim Krasniqi
senderOptions = senderOptions
    .producerProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "SampleTxn");       
           .sendTransactionally(source.map(r -> Flux.fromIterable(transform(r)))) 
           .concatMap(r -> r)
           .doOnError(e-> log.error("Send failed, terminating.", e))
           .doOnNext(r -> log.debug("Send completed {}", r.correlationMetadata());
Kafka Producer can not be reused in case when I use it with transaction. It complains that the same transactionId can not used in different calls on the same time(throws exception only in case of parallel calls). I can solve it by creating different producers with different transactionId but this is not that efficient as it consumes more threads internally. Is there any alternative to use same Kafka Producer - (KafkaSender.create(senderOptions) with different transactionIds without having to create and close it.
1 reply
Hi - quick question. Let’s say I’ve got a flux which will produce millions of entries. Each entry has some id: if we have not seen this Id yet in the flow - we need to do some extra work (e.g retrieve something). Otherwise we just get on with processing the entry. There are a large number of ‘ids’ - so group by doesn’t seem right. What is the reactor way to deal with this? I could use some state - like a set - to figure this out / record, but just wondering if there is a more idiomatic approach
Brigen Tafilica

Bumping my question
Hi its me again, Sorry to bother everyone

I am trying to read all accounts from a mongo collection (a LOT like 1Million and this is mandatory)
than i need to do some I/O bound operations on each account (sum all values inside a big array, some other arithmetic operations like * multiplication and Math.pow) and than Update them in the mongoCollection again.
Can someone help me with the best approach (maybe some demo code impl) to do this? Im struggling with the performance.
Also we might want to scale verically, so go from 32Cpu cores to 64. I assume since we have I/O bound proccessing scaling vertically will not help much or am i wrong? Is there also some other impl to take also this scaling in consideration? Thanks a lot in advance

josh gruenberg

hi there. I'd appreciate some advice: I have a flux of timestamped events (from kafka), and I need to collate them into event-time windows, with a configured accumulation timeout. the events aren't strictly ordered, but I can tolerate windows closing prematurely: stragglers that arrive after their previously-open window has closed can just be allocated into a new, separate window for the same time period.

I seek an appropriate strategy for achieving this without leaking resources or degrading performance as the cardinality of event-time windows grows without bound.

at first glance, something like the following seems like it might work:

events.groupBy(event -> event.timestamp().truncatedTo(windowDuration))
        .flatMap(group -> group.window(windowAccumulationTimeout))

however, I'm concerned about the implications of groupBy for that unbounded stream of groups. I suspect this might be problematic because each group will remain in the the groupBy state forever, even after the window-timeout expires for the last straggling event in each window.

am I wrong to be concerned about this? is there a better way to do what I need?

7 replies
Salathiel Genèse

Hey guys

I trust you all are thriving.

For reusability concerns, I run my validation on the service layer, which returns Mono.error( constraintViolationException )...

So that my web handlers merely forward the unmarshalled domain to the service layer.

So far, so great.

But how do I advise (AOP) my web handlers so that it returns HTTP 422 with the formatted constraint violations ?

WebExchangeBindException only handle exceptions thrown synchronously.

My AOP advice trigger and error b/c :

  • my web handler return Mono<DataType>
  • but my advice return a ResponseEntity

And if I wrap my response entity (from the advice) into a Mono<ResponseEntity>, I an HTTP 200 OK with the response entity serialized :(

Salathiel Genèse
The link to the SO question, with code excerpt - https://stackoverflow.com/q/69519724/3748178
Zane XiaoYe
Hi, I’m working on rewriting a periodically pulling job (using @Scheduled annotation) into reactive stream. The system pulls messages from an message queue(not Kafka). It is thread safe and the previous implementation is using multiple clients/threads to do the pulling. After the messages are pulled, we need to do some processing(enrichment, sending to another queue) then ack back to the message queue.
What is the equivalent operator/API to do the same work using Reactor?
Daniil Mikhaylov

Hi there!
I have a question. Why it's recommended to run blocking code out of Mono.defer() or mono.flatMap() in Mono.fromRunnable() or Mono.fromCallable()?
I mean, if I look at sources of Mono.fromRunnable(), actually it does like the same as Mono.defer():

    public void subscribe(CoreSubscriber<? super T> actual) {
        MonoRunnableEagerSubscription s = new MonoRunnableEagerSubscription();
        if (s.isCancelled()) {
        try {
        } catch (Throwable ex) {
            actual.onError(Operators.onOperatorError(ex, actual.currentContext()));

The blocking code runs in subscribe() method. Supplier in Mono.defer() does the same?
Does it matter if I switch thread through .subscribeOn() operator?
Also MonoSubscribeOn cause me for thought. It does worker.schedule(() -> s.request(n)); on parent, when subscribe happens. But I can't find what happens, why request() scheduled on worker, not subscribe()?


Hi all,

I have function that deletes user both from database and keyclaock. Sometimes it happen that user gets deleted from database but remains in Keyclaock. Dose anyone have an idea what could be a problem? Only happens from time to time but do not know why.

        return _companyRepository.findById(companyId))
            .flatMap(companyDocument -> {
                    final List<Employees> employees = companyDocument.getEmployees();
                    if (employees != null && !employees.isEmpty())
                        final boolean isRemoved =
                            employees.removeIf(employees -> employees.getId().equals(employeeId));
                        if (isRemoved)
                            return _companyRepository.save(companyDocument).thenReturn(employeeId);
                return Mono.just(employeeId);
            .flatMap(employeeId -> _keycloakService.deleteUser(employeeRealm, employeeId))
            .onErrorResume(e -> Mono.empty());
Salathiel Genèse
And a StackOverflow +50 bounty

Hi, I am using SSE in the Spring web flux application. I have created an SSE endpoint that gets data from Redis pub-sub and Redis-pub gets data from an External WebSocket connection.

My problem is that when there is no data from Websocket then as soon as I try to connect with my SSE endpoint the connection terminates due to the unavailability of data. My hunch is that I need to pass comments heartbeat if data is not coming. how can I do that?

private fun realtimeSinkInitializer() {

         * onBackpressureBuffer by default cancel the subscription when last client closes, we made auto cancel false
        realtimeSink = Sinks.many().multicast().onBackpressureBuffer(1000, false)

            .map(ReactiveSubscription.Message<String, String>::getMessage)
            .map { msg ->
                { weatherData: List<WeatherDto> ->


                { err ->
                    logger.error(err) { "Error occurred redis subscription channel" }

 fun getRealtimeWeather(): Flux<ServerSentEvent<Map<String, WeatherDto>>> {
        return weatherSink.asFlux()
            .map { e -> ServerSentEvent.builder(e).build() }.onErrorResume { e ->
                logger.error(e) { "Error getting realtime weather data" }
2 replies
Mantas Matūzas
Hello, I'm trying to integrate reactive and non-reactive code. The use case is to have non-reactive event listener and call reactive service. Do you think is it fine to do something like this?
  public void handle(TheEvent event) {
Salathiel Genèse


The subscribe() method return a Disposable... If you do not keep a reference to that object, when do you call its dispose object ?


Even if you do, it doesn't really make the whole thing reactor-compliant

What you need (I think)
Is a publisher [maybe within your service], to which you can PUSH TheEvent as they come in
The handle( TheEvent ) method from your except pushes events and do not subscribe to it
Salathiel Genèse
You reactiveEventService will be responsible of subscribing to that published OR, if it exposes that publisher (Mono, Flux or else), whatever code want to listen from it
Mantas Matūzas
@SalathielGenese makes sense. I will try it.
Salathiel Genèse
Anand Jaisy
public DiscountCommand create(Mono<String> insertId, DiscountCommand discountCommand) {
    return insertId.map(item -> {
        return new DiscountCommand(
    }).block(Duration.of(1000, ChronoUnit.MILLIS));
If the insertId is null, it throws an null pointer exception and application goes to infinite. How can I handle error
Salathiel Genèse
1./ You may want to annotate your parameter as non null

2./ You can use if/else OR Optional

Optional.ofNullable( insertId )
  .map( __ -> insertId.map( item -> new /* ... */ ) )
  .orElseGet( Mono::empty )
  .block(/* duration */);

Something like that (untested)

9 replies
Hello, How can emit an item received through a rest endpoint without a blocking call to emit. i want to emit the item to a sink and return the item as response.
Sinks.Many<String> inputSink = Sinks.many().multicast().onBackpressureBuffer();
public Mono<String> process(String uid) {
inputSink.emitNext(uid, (signalType, emitResult) -> {
return false;
return Mono.just(uid);
Basically, i want to process the item received via rest endpoint asynchronously. I would like to subscribe to the processed items in another thread as shown below
I am subscribing to inputSink and process item and emitting outputSink
Sinks.Many<String> outputSink = Sinks.many().unicast().onBackpressureBuffer();
public Flux<String> getProcessedItems() {
        return outputSink.asFlux();
Is this approach correct?

Hi folks. I'm trying to use Sinks.many().unicast().onBackpressureBuffer() for the first time and I have unexpected result. Here is how I use it

// on Thread B

  .map {
    // doing work here
    // executed on Thread B, instead of THREAD-POOL-A

I see that flushingSink.tryEmit is blocking AND queued messages are processed on calling thread. I thought they would be queued asynchronously. What am I doing wrong ?

3 replies

Does anybody know if passing Mono/Flux around in the project is right way of doing chaining? It seem like I'm losing performance on all these reactor wrappers. In profiler I see call-stack like this one https://snipboard.io/fXT6KW.jpg

I see calls as deep as 150 nested Reactor invocations.

1 reply
Hi folks! What is the right answer to this question ? https://stackoverflow.com/questions/31185129/parallel-dispatch-of-groupby-groups-in-reactor
Basically I'm trying to do the same - I want to process some work in parallel, always preserving original order per group (even if group changes thread later).
6 replies
Hi there!
I want to read a message and then execute a http request. If http request 2хх, ack message, otherwise - nack. But the problem is flatMap returns Mono<ResponseEntity> and I can’t pass to map original amp message.
RabbitFlux.createReceiver().consumeManualAck(QUEUE_NAME, consumeOptions)
        .flatMap(amqpMessage -> {
            String url = new String(amqpMessage.getBody());
            return webclient.get().uri(url).retrieve().toBodilessEntity();
       .map(responseEntity -> {
           if (responseEntity.getStatusCode().is2xxSuccessful()) {
               // how to pass here amqpMessage?
           } else {
           return amqpMessage;
1 reply
Hi everyone ..I have 2 hot flux which are running in parallel and one mono. I want mono to be always executed after both the flux have completed. I tried to concat first flux and mono but mono executes immediately after first flux has completed.How do i make sure that mono always runs after both the parallel flux have completed.Below is just an example.

var sourceFlux= Flux.range(1,10);
var hotflux1=sourceFlux.publish().refCount(2);
var firstFlux=hotflux1.map(i->i*2).log("First");

//this is a io blocking call
var secondFlux=hotflux1

var firstMono=Flux.just(300).log("First Mono");
var concatedFlux=Flux.concat(firstFlux,firstMono);

3 replies
In real first flux is calling some service and second flux is db call and first mono is also service call. Mono requires data from both first flux and second flux before calling its service.
Ben Tomasini

I am using an approach similar to this to lift a reactor context into ThreadLocal for use by a grpc client interceptor.


I am getting some stale values due to some bugs in the way I am using it. My main concern with this approach is its dependence on the onNext call to lift a fresh context into the ThreadLocal. If used incorrectly - namely not doing a contextWrite on a stream, it can lead to leaked values from other threads.

One way I thought of to add more safety would be to clear the ThreadLocal in the onComplete or onError of the subscriber. From my initial inspection it appears this works because, so far, the onComplete and onError is invoked on the same thread as onNext.

Question: Is it guaranteed that onComplete and onError is called on the same thread as onNext for a given subscriber like this?

1 reply

Is there any exhaustMap operator on reactor core ?

I have same question, came across how API doc of Reactor 3, can't find equivalent operator, google shows me this channel and conversation

Marek Andreansky
Can you recommend a well written open source library that utilizes Reactor? I would like to read through some real code to see how I could utilize Reactor.
Salathiel Genèse
Spring Webflux !?
Netty ?
John Meehan
Looking for a way to chunk a Flux<ByteBuffer> into specified chunk sizes in bytes. Does such a solution already exist somewhere?

Looking for a way to chunk a Flux<ByteBuffer> into specified chunk sizes in bytes. Does such a solution already exist somewhere?

concatMap and external state for buffer remainders less than specified size?

John Meehan
That's the path I'm headed down. Coming from Akka streams and Scala, keeping external state seemed like the wrong approach. But I see from other examples that's not out of the norm here. Something like?
    public static Flux<ByteBuffer> chunker(Flux<ByteBuffer> data, int chunkSize) {
        return Flux.defer(() -> {
            AtomicLong countDown = new AtomicLong(chunkSize);
            return data.bufferUntil(buf -> {
                if (countDown.addAndGet(-buf.remaining()) <= 0L) {
                    return true;
                return false;
            }).concatMap(bufList -> {
                int size = bufList.stream().mapToInt(ByteBuffer::remaining).sum();
                return Mono.just(bufList.stream().reduce(ByteBuffer.allocate(size), ByteBuffer::put));
4 replies
Hi, how to implemente throttling in reactor?
Daniel Wahlqvist
Can I do a nested groupBy? I have equipments and for each equipment I have different metrics. So I would like to do a groupBy for each equipment and then an inner groupBy for each type of metric. That might be the wrong approach of course but I'm mostly trying to get started and learn
2 replies
josh gruenberg

is there a straightforward way to achieve a graceful flush/shutdown with an infinite source? I have a Flux from a KafkaReceiver, and at shutdown, I want to stop receiving messages from upstream, but allow all of the in-flight work to complete, and then allow the KafkaReceiver to commit its offsets (via the provided ReceiverOffset.acknowledge callback), before the KafkaReceiver closes its underlying kafka consumer. dispose seems to propagate upstream immediately, canceling all buffered work... and it also induces the KafkaConsumer to be closed before the offsets associated with that work can be committed.

I think this might naturally take the form of arranging for the request signals flowing upstream to the KafkaReceiver to be disabled at shutdown time, but I haven't found a simple way to inject that behavior into the wiring. I'm using a pretty hacky solution that involves publish to a ConnectableFlux, with a bogus consumer that stops requesting ... but this seems pretty heavyweight and unnatural. Any suggestions?

3 replies

Hi everyone, I have been using Mono.zip a lot in my application and since I am from nodejs background, I was thinking it works like promise.all but I found recently In mono.zip if one of the mono returns empty it won't run


fun main() {
    val m1 = Mono.just("A")
    val m2 = Mono.just("B")
    val m3 = Mono.empty<String>()

    Mono.zip(m1,m2,m3).map {
        println("Data t1: ${it.t1} t2: ${it.t2} t3: ${it.t3}")

It dont prints anything


fun main() {
    val m1 = Mono.just("A").map { Optional.of(it) }.defaultIfEmpty(Optional.empty())
    val m2 = Mono.just("B").map { Optional.of(it) }.defaultIfEmpty(Optional.empty())
    val m3 = Mono.empty<String>().map { Optional.of(it) }.defaultIfEmpty(Optional.empty())

    Mono.zip(m1,m2,m3).map {
        println("Data t1: ${it.t1.get()} t2: ${it.t2.get()} t3: ${it.t3.orElse(null)}")

My requirement is I have a couple of mono and I need output of all of them even one is empty. In case any of the mono fails then it can throw an error.

@OlegDokuka. any view regarding this buddy

1 reply
Salathiel Genèse
I think you're doing it right, Zakir. Since null value will result in an empty mono
3 replies
Daniel Wahlqvist
I have a flux and I want to dynamically tell it how to transform the data. My idea is to join the flux with another "command flux" and adapt the processing depending on the last command. Is this idea similar to a real and tested pattern someone else have used? I suspect you guys can give me some hints. What I need to do is to figure out how to implement this and I need to validate if it's a good pattern at all.
3 replies
Daniel Wahlqvist
I've been reading the API documentation and feel a little overwhelmed but also impressed and curious. Now I'm wondering; what is Reactor? Sure, I get it's a reactive programming framework but what I haven't anticipated is what such a framework must evolve into, if it's successful. I didn't realize this before but now I believe a successful reactive programming framework MUST in the long run evolve into a successful stream processing framework. That might be a strange claim, and I know that's not what many people believe. E.g. this attitude is common: https://www.tikalk.com/posts/2017/12/06/streaming-vs-reactive/ Given all this; what is Reactor and what is it evolving into?
Salathiel Genèse
That article just sparks confusion