Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Mantas Matūzas
@mantoshelis
Hello, can somebody explain whether this code block is blocking or not? If yes - how should I integrate ObjectMapper to call chain correctly?
    return Mono.just(apiErrorTranslator.translate(throwable))
        .handle(
            (errorResponse, sink) -> {
              log.error("Error occurred while processing request", throwable);

              var httpResponse = exchange.getResponse();

              httpResponse.setRawStatusCode(errorResponse.getStatus());
              httpResponse.getHeaders().setContentType(MediaType.APPLICATION_JSON);

              try {
                var bytes = objectMapper.writeValueAsBytes(errorResponse);
                var buffer = httpResponse.bufferFactory().wrap(bytes);
                sink.next(buffer);
              } catch (JsonProcessingException e) {
                sink.error(e);
              }
            });
8 replies
Zakir
@Zaky7
Does anyone have suggestions regarding job schedular like quartz for Reactive universe? I have use quartz in Spring MVC but in my current web flux setup, I use Reactive Cassandra. I need a distributed job scheduler for Cron jobs
Jonathan Gray
@jongray
Hi, I'm using reactor-rabbitmq RabbitFlux Receiver and consumeManualAck with default ConsumeOptions. We have auto recovery enabled on the RabbitMQ connection but have faced a network connection issue which appears to have been auto-recovered but no further consumption occurred. I see the default ConsumeOptions exception handler retries for 10 seconds, what happens when that time passes? Is it advisable to set that time to a very large value if I know that the auto-recovery is there?
FlumeEzra
@FlumeEzra
Hey Everyone! I've asked this question on rsocket gitter already, but they said that it's rather related to reactor so now i'm here.
    @Override
    public Flux<Payload> requestStream(Payload payload) {
        //first flux
        return Flux.interval(Duration.ofMillis(1000))
                .map(time -> {
                    System.out.println("Creating...");
                    return DefaultPayload.create("Hello " + payload.getDataUtf8() + " @ " + Instant.now());
                });

        //second flux
        return Flux.create(fluxSink -> {
            for (int i = 0; i < 10; i++) {
                try {
                    System.out.println("Creating...");
                    Thread.sleep(5000);
                    fluxSink.next(DefaultPayload.create(String.format("Number %s", i)));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
how the first flux creates 1 element and sends it to the client right away while the second one generates all 10 elements and only then sends it to the client? also how would i go about doing something similar to the first flux? please help
ken
@kenmacd:matrix.org
[m]
I'm trying to understand the Context stuff and wondering how/if it works deep in library code. I guess I might be asking if a threadlocal is set for the time when something is running.
Manish Malik
@malikmnsh
I am moving from Akka actor reactive programming to webflux. Any documentation or any reference which can help. Thanks in advance.
Zane XiaoYe
@Zane-XY

Hi,

Mono<Address> address = ..;
Mono<Person> person = ..;
Mono.zip(address, person).flatMap(t -> {...});

both address and person could be empty, and if it’s empty, I need to log an warning. where should I put the log.warn()?

3 replies
Zane XiaoYe
@Zane-XY
Hi, how can I build a pdf or epub from the https://github.com/reactor/reactor-core/tree/main/docs/asciidoc, is there any existing commands?
12 replies
Zane XiaoYe
@Zane-XY
image.png
Brigen Tafilica
@BigyG_gitlab
Hello everyone.
I am trying to read all accounts from a mongo collection (a LOT like 1Million and this is mandatory)
than i need to do some I/O bound operations on each account and than Update them in the mongoCollection again.
Can someone help me with the best approach (maybe some demo code impl) to do this? Im struggling with the performance.
Also we might want to scale verically, so go from 32Cpu cores to 64. I assume since we have I/O bound proccessing scaling vertically will not help much or am i wrong? Is there also some other impl to take also this scaling in consideration? Thanks a lot in advance
Pieter Martin
@pietermartin

Hi,
I am struggling to understand the parallel behavior.

    @Test
    public void test() throws InterruptedException {
        Flux.range(1, 1_000)
                .subscribeOn(Schedulers.parallel())
                .subscribe((a) -> {
                    System.out.println(Thread.currentThread().getName() + " " + a);
                });
        Thread.sleep(10_000_000);
    }

Output

parallel-1 1
parallel-1 2
parallel-1 3
...
parallel-1 998
parallel-1 999
parallel-1 1000

I was expecting the subscribe function to execute in many threads depending on my CPU.
Instead they all execute on parallel-1
Doing some wrong I suspect?
Thanks

1 reply
Ketan Vishwakarma
@ketankvishwakarma
What should be the reactive equivalent of SecurityContextHolder.getContext().getAuthentication();
Zakir
@Zaky7

Hi, what is the recommended concurrency in the case of flatMap? I bumped to this question which says the default value is 256
https://stackoverflow.com/questions/62558749/project-reactor-backpressure-issue

My question is this on which scenarios we need to explicitly handle concurrency in flatMap

13 replies
Zakir
@Zaky7

Hi, what is the recommended concurrency in the case of flatMap? I bumped to this question which says the default value is 256
https://stackoverflow.com/questions/62558749/project-reactor-backpressure-issue

My question is this on which scenarios we need to explicitly handle concurrency in flatMap

@OlegDokuka any suggestions?

Mico Piira
@micopiira
Is it possible to use BlockHound inside real Tomcat? I just get a bunch of IllegalAccessErrors when trying to initialize BlockHound in a web application running inside Tomcat.
Zane XiaoYe
@Zane-XY
Shoud I use Mono for non-async code?
Sometimes I just want to use it as a Optional for easier composition. Should I use Mono as Optional?
Burim Krasniqi
@burimkrasniqi
senderOptions = senderOptions
    .producerProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "SampleTxn");       
KafkaSender.create(senderOptions)
           .sendTransactionally(source.map(r -> Flux.fromIterable(transform(r)))) 
           .concatMap(r -> r)
           .doOnError(e-> log.error("Send failed, terminating.", e))
           .doOnNext(r -> log.debug("Send completed {}", r.correlationMetadata());
Kafka Producer can not be reused in case when I use it with transaction. It complains that the same transactionId can not used in different calls on the same time(throws exception only in case of parallel calls). I can solve it by creating different producers with different transactionId but this is not that efficient as it consumes more threads internally. Is there any alternative to use same Kafka Producer - (KafkaSender.create(senderOptions) with different transactionIds without having to create and close it.
1 reply
aconst-null
@aconst-null
Hi - quick question. Let’s say I’ve got a flux which will produce millions of entries. Each entry has some id: if we have not seen this Id yet in the flow - we need to do some extra work (e.g retrieve something). Otherwise we just get on with processing the entry. There are a large number of ‘ids’ - so group by doesn’t seem right. What is the reactor way to deal with this? I could use some state - like a set - to figure this out / record, but just wondering if there is a more idiomatic approach
Brigen Tafilica
@BigyG_gitlab

Bumping my question
Hi its me again, Sorry to bother everyone

I am trying to read all accounts from a mongo collection (a LOT like 1Million and this is mandatory)
than i need to do some I/O bound operations on each account (sum all values inside a big array, some other arithmetic operations like * multiplication and Math.pow) and than Update them in the mongoCollection again.
Can someone help me with the best approach (maybe some demo code impl) to do this? Im struggling with the performance.
Also we might want to scale verically, so go from 32Cpu cores to 64. I assume since we have I/O bound proccessing scaling vertically will not help much or am i wrong? Is there also some other impl to take also this scaling in consideration? Thanks a lot in advance

josh gruenberg
@joshng

hi there. I'd appreciate some advice: I have a flux of timestamped events (from kafka), and I need to collate them into event-time windows, with a configured accumulation timeout. the events aren't strictly ordered, but I can tolerate windows closing prematurely: stragglers that arrive after their previously-open window has closed can just be allocated into a new, separate window for the same time period.

I seek an appropriate strategy for achieving this without leaking resources or degrading performance as the cardinality of event-time windows grows without bound.

at first glance, something like the following seems like it might work:

events.groupBy(event -> event.timestamp().truncatedTo(windowDuration))
        .flatMap(group -> group.window(windowAccumulationTimeout))

however, I'm concerned about the implications of groupBy for that unbounded stream of groups. I suspect this might be problematic because each group will remain in the the groupBy state forever, even after the window-timeout expires for the last straggling event in each window.

am I wrong to be concerned about this? is there a better way to do what I need?

7 replies
Salathiel Genèse
@SalathielGenese

Hey guys

I trust you all are thriving.

For reusability concerns, I run my validation on the service layer, which returns Mono.error( constraintViolationException )...

So that my web handlers merely forward the unmarshalled domain to the service layer.

So far, so great.


But how do I advise (AOP) my web handlers so that it returns HTTP 422 with the formatted constraint violations ?

WebExchangeBindException only handle exceptions thrown synchronously.

My AOP advice trigger and error b/c :

  • my web handler return Mono<DataType>
  • but my advice return a ResponseEntity

And if I wrap my response entity (from the advice) into a Mono<ResponseEntity>, I an HTTP 200 OK with the response entity serialized :(

Salathiel Genèse
@SalathielGenese
The link to the SO question, with code excerpt - https://stackoverflow.com/q/69519724/3748178
Zane XiaoYe
@Zane-XY
Hi, I’m working on rewriting a periodically pulling job (using @Scheduled annotation) into reactive stream. The system pulls messages from an message queue(not Kafka). It is thread safe and the previous implementation is using multiple clients/threads to do the pulling. After the messages are pulled, we need to do some processing(enrichment, sending to another queue) then ack back to the message queue.
What is the equivalent operator/API to do the same work using Reactor?
Daniil Mikhaylov
@mdsina

Hi there!
I have a question. Why it's recommended to run blocking code out of Mono.defer() or mono.flatMap() in Mono.fromRunnable() or Mono.fromCallable()?
I mean, if I look at sources of Mono.fromRunnable(), actually it does like the same as Mono.defer():

    public void subscribe(CoreSubscriber<? super T> actual) {
        MonoRunnableEagerSubscription s = new MonoRunnableEagerSubscription();
        actual.onSubscribe(s);
        if (s.isCancelled()) {
            return;
        }
        try {
            run.run();
            actual.onComplete();
        } catch (Throwable ex) {
            actual.onError(Operators.onOperatorError(ex, actual.currentContext()));
        }
    }

The blocking code runs in subscribe() method. Supplier in Mono.defer() does the same?
Does it matter if I switch thread through .subscribeOn() operator?
Also MonoSubscribeOn cause me for thought. It does worker.schedule(() -> s.request(n)); on parent, when subscribe happens. But I can't find what happens, why request() scheduled on worker, not subscribe()?
Thanks

Red444
@MiloS444

Hi all,

I have function that deletes user both from database and keyclaock. Sometimes it happen that user gets deleted from database but remains in Keyclaock. Dose anyone have an idea what could be a problem? Only happens from time to time but do not know why.

        return _companyRepository.findById(companyId))
            .flatMap(companyDocument -> {
                    final List<Employees> employees = companyDocument.getEmployees();
                    if (employees != null && !employees.isEmpty())
                    {
                        final boolean isRemoved =
                            employees.removeIf(employees -> employees.getId().equals(employeeId));
                        if (isRemoved)
                        {
                            companyDocument.setEmployees(employees);
                            return _companyRepository.save(companyDocument).thenReturn(employeeId);
                        }
                    }
                }
                return Mono.just(employeeId);
            })
            .flatMap(employeeId -> _keycloakService.deleteUser(employeeRealm, employeeId))
            .onErrorResume(e -> Mono.empty());
Salathiel Genèse
@SalathielGenese
And a StackOverflow +50 bounty
Zakir
@Zaky7

Hi, I am using SSE in the Spring web flux application. I have created an SSE endpoint that gets data from Redis pub-sub and Redis-pub gets data from an External WebSocket connection.

My problem is that when there is no data from Websocket then as soon as I try to connect with my SSE endpoint the connection terminates due to the unavailability of data. My hunch is that I need to pass comments heartbeat if data is not coming. how can I do that?

@PostConstruct
private fun realtimeSinkInitializer() {

        /**
         * onBackpressureBuffer by default cancel the subscription when last client closes, we made auto cancel false
         */
        realtimeSink = Sinks.many().multicast().onBackpressureBuffer(1000, false)


        reactiveMsgListenerContainer.receive(weatherTopic)
            .onBackpressureBuffer(1000)
            .map(ReactiveSubscription.Message<String, String>::getMessage)
            .map { msg ->
                objectMapper.deserialize<List<WeatherDto>>(msg)
            }
            .subscribe(
                { weatherData: List<WeatherDto> ->

                    realtimeSink.tryEmitNext(weatherData)

                },
                { err ->
                    logger.error(err) { "Error occurred redis subscription channel" }
                }
            )
    }

 fun getRealtimeWeather(): Flux<ServerSentEvent<Map<String, WeatherDto>>> {
        return weatherSink.asFlux()
            .map { e -> ServerSentEvent.builder(e).build() }.onErrorResume { e ->
                logger.error(e) { "Error getting realtime weather data" }
                Flux.error(InternalServerException())
            }
    }
2 replies
Mantas Matūzas
@mantoshelis
Hello, I'm trying to integrate reactive and non-reactive code. The use case is to have non-reactive event listener and call reactive service. Do you think is it fine to do something like this?
  @EventListener
  public void handle(TheEvent event) {
    reactiveEventService.handle(event).subscribe();
  }
Salathiel Genèse
@SalathielGenese

First...

The subscribe() method return a Disposable... If you do not keep a reference to that object, when do you call its dispose object ?

Second...

Even if you do, it doesn't really make the whole thing reactor-compliant

What you need (I think)
Is a publisher [maybe within your service], to which you can PUSH TheEvent as they come in
The handle( TheEvent ) method from your except pushes events and do not subscribe to it
Salathiel Genèse
@SalathielGenese
You reactiveEventService will be responsible of subscribing to that published OR, if it exposes that publisher (Mono, Flux or else), whatever code want to listen from it
Mantas Matūzas
@mantoshelis
@SalathielGenese makes sense. I will try it.
Salathiel Genèse
@SalathielGenese
+1
Anand Jaisy
@anandjaisy
```
public DiscountCommand create(Mono<String> insertId, DiscountCommand discountCommand) {
    return insertId.map(item -> {
        return new DiscountCommand(
                item,
                discountCommand.discountCode(),
                discountCommand.discountType(),
        );
    }).block(Duration.of(1000, ChronoUnit.MILLIS));
}
```
If the insertId is null, it throws an null pointer exception and application goes to infinite. How can I handle error
Salathiel Genèse
@SalathielGenese
1./ You may want to annotate your parameter as non null

2./ You can use if/else OR Optional

Optional.ofNullable( insertId )
  .map( __ -> insertId.map( item -> new /* ... */ ) )
  .orElseGet( Mono::empty )
  .block(/* duration */);

Something like that (untested)

9 replies
Raghu
@code-uri
Hello, How can emit an item received through a rest endpoint without a blocking call to emit. i want to emit the item to a sink and return the item as response.
```
Sinks.Many<String> inputSink = Sinks.many().multicast().onBackpressureBuffer();
public Mono<String> process(String uid) {
inputSink.emitNext(uid, (signalType, emitResult) -> {
return false;
});
return Mono.just(uid);
}
Raghu
@code-uri
Basically, i want to process the item received via rest endpoint asynchronously. I would like to subscribe to the processed items in another thread as shown below
I am subscribing to inputSink and process item and emitting outputSink
Sinks.Many<String> outputSink = Sinks.many().unicast().onBackpressureBuffer();
public Flux<String> getProcessedItems() {
        return outputSink.asFlux();
  }
Is this approach correct?
ruslan
@unoexperto

Hi folks. I'm trying to use Sinks.many().unicast().onBackpressureBuffer() for the first time and I have unexpected result. Here is how I use it

// on Thread B
flushingSink.tryEmit(...)

flushingSink.asFlux()
  .subscribeOn(THREAD-POOL-A)
  .map {
    // doing work here
    // executed on Thread B, instead of THREAD-POOL-A
  }

I see that flushingSink.tryEmit is blocking AND queued messages are processed on calling thread. I thought they would be queued asynchronously. What am I doing wrong ?
Thanks!!

3 replies
ruslan
@unoexperto

Does anybody know if passing Mono/Flux around in the project is right way of doing chaining? It seem like I'm losing performance on all these reactor wrappers. In profiler I see call-stack like this one https://snipboard.io/fXT6KW.jpg

I see calls as deep as 150 nested Reactor invocations.

1 reply
ruslan
@unoexperto
Hi folks! What is the right answer to this question ? https://stackoverflow.com/questions/31185129/parallel-dispatch-of-groupby-groups-in-reactor
Basically I'm trying to do the same - I want to process some work in parallel, always preserving original order per group (even if group changes thread later).
6 replies
Dmitry
@dshuplyakov
Hi there!
I want to read a message and then execute a http request. If http request 2хх, ack message, otherwise - nack. But the problem is flatMap returns Mono<ResponseEntity> and I can’t pass to map original amp message.
RabbitFlux.createReceiver().consumeManualAck(QUEUE_NAME, consumeOptions)
        .flatMap(amqpMessage -> {
            String url = new String(amqpMessage.getBody());
            return webclient.get().uri(url).retrieve().toBodilessEntity();
        })
       .map(responseEntity -> {
           if (responseEntity.getStatusCode().is2xxSuccessful()) {
               // how to pass here amqpMessage?
               amqpMessage.ack();    
           } else {
               amqpMessage.nack();
           }
           return amqpMessage;
        }).subscribe();
1 reply
harshdave840
@harshdave840
Hi everyone ..I have 2 hot flux which are running in parallel and one mono. I want mono to be always executed after both the flux have completed. I tried to concat first flux and mono but mono executes immediately after first flux has completed.How do i make sure that mono always runs after both the parallel flux have completed.Below is just an example.