Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    kaladhar
    @kaladhar-mummadi
    Do you guys think multi-cast is the solution here ?
    mplain
    @mplain
    speaking of, it's kinda weird that there are no doOnXX operators exposing context
    especally seeing how transformDeferredContextual gives an example of using context to log stuff
    1 reply
    kaladhar
    @kaladhar-mummadi
    Hi,
    Is there a way to skip stacktrace ( the 2nd part of log with all the reactor.core.publisher details ) from logs when using ReactorDebugAgent.init() ?
    Thanks in advance
    Mritunjay Dubey
    @mddubey
    Hi! Posted a question about reactor-rabbitmq here on stackoverflow. In case someone can help me out
    https://stackoverflow.com/questions/65068655/how-to-work-with-dead-letter-quueus-with-reactor-rabbitmq
    Bakulin Efim
    @efimbakulin
    Hey there!
    Is it OK that Mono receives a cancel signal after onComplete? What may lead to this situation?
    2 replies
    malone
    @malone081021
    is there a hook when hop thread with pushlishOn or subscribeOn? or reactor intercepting thread switch
    please take look on above Query , i am trying to use AWS read replica with Spring R2DBC
    but i am geting unknon host error
    Sunny Shaw
    @sunny-shaw
    Hi, I am not sure what is wrong with the below code snippet. Not able to get the correct output.
    Can someone point out what is wrong here?
    @Test
        fun stringBuilderWithWebFLux() {
            val builder = StringBuilder()
    
            builder.appendln("Initial")
    
            val list = listOf(1,2,3)
    
            val res = Flux.fromIterable(list)
                .flatMap {
                    apiCall(it).map { builder.appendln(it) }
                }.then(builder.toString().toMono())
    
            StepVerifier.create(res)
                .consumeNextWith {
                    print(it) //returns Intial instead of Initial \n Value 1 \n Value 2 \n Value 3
                }.verifyComplete()
        }
    private fun apiCall(it: Int?): Mono<String> {
        return "Value $it".toMono()
    }
    1 reply
    Dmitri Maximovich
    @maximdim_twitter

    Hi. I wonder if my code is correct. I have a method that produces a Flux and I need to pick one element from it, or do something if the Flux is empty (generate an error). So far I have the following:

            service.getFoo("foo")
                    .sort((Comparator.comparing(Foo:date)))
                    .next()
                    .switchIfEmpty(Mono.error(NotFoundException::new))
                    .doOnNext(System.out::println)
                    .subscribe();

    It seems to work fine but I'm seeing the following ERROR in the log output:

    2021-02-23 15:23:22 [main] ERROR [] reactor.core.publisher.Operators - Operator called default onErrorDropped
    reactor.core.Exceptions$ErrorCallbackNotImplemented: com.idexx.isc.realtime.server.session.ClientNotFoundException
    Caused by: NotFoundException: null
    ...
    2 replies
    Ivan Rodrigues
    @ivanzitojr_twitter

    Hi folks!!

    I'm new using reactor and I faced an issue.
    In the following code

              val flux = range(1, 10)
            .log()
            .doOnCancel { countDown.countDown() }
            .doOnComplete { countDown.countDown() }
            .delayElements(Duration.ofSeconds(1))
            .doOnNext {
                val state = repository.find(receipt.movementIdentification)
                logger.info("State = $state")
               if (state == "CANCELED") throw PixCanceledException("server.error", "Pix cancelled")
                if (state == "FINISHED") countDown.countDown
            }
            .map { receipt }
            .collectList()
            .map { it.component1() }
            .publishOn(Schedulers.boundedElastic())
    
    
        flux.subscribe()
        countDown.await()

    In the code above I wanna finish/cancel a subscription of a subscriber. How can I do this ?
    Looking for some questions in the stackoverflow btw I saw that isn't a motivation to do this. In my case I could throw an exception and it will work, but I wanna do the work in the right way.
    Reading the documentation, it don't say specifically how can I send a signal from a onNext step. In the documentation explain more about signals send by example when an error occur or cancel the subscription out of the producer.
    Sincerly I expect that I could express myselsf
    Tks !!

    4 replies
    gtay003
    @gtay003
    Hi all, I've recently been switching some of the code in our app that's currently using the Processor API over to using the new Sinks API instead. The new API seems much more intuitive and well thought out, thanks for all your hard work! One thing we've been hit by is the difference between a call to processor.sink() which provides a serialised sink that gates/serialises concurrent calls to onNext(), and the Sinks.many() factory methods that fail with EmitResult.FAIL_NON_SERIALIZED when multiple producers emit concurrently. I can understand the rationale for this, and found this SO answer which suggests a 'wait and retry' failure handler to work around the issue, and this seems to be working for us. My first question is: is this a reasonable way to solve this problem; and second: are there any plans to provide some additional factory methods or decorators to do this with an API call without needing a custom failure handler? I am thinking something like Sinks.many().unicast().serialised() or something similar? Thanks in advance!
    2 replies
    Mritunjay Dubey
    @mddubey

    Hello People,

    Going to be a long read; :(

    Working with reactor on web-flux and having some performance issues when we load test with a lot of concurrent users(15-20K). We have two basic questions and possibly could help us improve the performance of our systems

    • Any specific reason CoreSubscriber methods are void and not Mono<Void>? Does that mean whatever code we will write here will be blocking? We are using slf4j-MDC to pass a trace-id into all the logs. For this we have similar approach as mentioned here. The approach is to implement a core-subscriber where we will copy the traceid to the MDC from the current context. We see a lot of CPU time is spent in this method itself. Please let us know if we are doing something wrong.

    • How does running tasks on scheduler helps if we have a single CPU? In our case we are running 10 kubernetes containers with each "1000m" CPU, which is more like a single core machine(if I understand correctly). In this case it does not matter if we run some stuff on scheduler because at the end there is a single core and nothing can be scheduled concurrently. To test my above theory I doubled the CPU in each container(2000m) and reduced the numbers of containers to 5 but there was not much difference. Theocratically, it should reduce the response time but there was not much difference.

    Please let us know if there are any pointers on these.

    Jeevjyot Singh Chhabda
    @jeevjyot
    Hi Folks,
    Trying to use FluxSink and not very sure I understand onRequest (https://projectreactor.io/docs/core/release/api/reactor/core/publisher/FluxSink.html#onRequest-java.util.function.LongConsumer-) method. Could anyone please shed some light hot is request count is determined?
    HaloFour
    @HaloFour
    Good morning! I was wondering if anyone knew of good tools that might be used to detect if a Reactor publisher is not being used? It's a common issue in our Spring WebFlux projects that newly onboarded developers assume that they can drop a Mono<Void> on the floor as they could with a void-returning method and if possible I'd like to catch that during compilation.
    3 replies
    Archit Agarwal
    @architmmmec_twitter
    Hi Team, I am using receiverOffset.commit() in my kafka consumer(Asynchronous way). Can anyone tell approx how much time does it take to commit the offsets. I waited for 3 min still offsets were not committed. Can someone help on this
    Aleksey Vasin
    @youagree
    hello, how i can pass request attributes into rest-controller by WebFilter? now i use contextWrite to put key/value, and get this in rest-controller by deferContextual and it does not work
    Alex Mrynsky
    @amrynsky

    Just discovered unexpected behavior of Flux.cache that causing flow to hang indefinitely.

    @Test
        void flow() {
            var flux = getSource()
                    .doOnSubscribe(__ -> log.info("Loading source..."))
                    .cache(Duration.ofSeconds(2))
                    .doOnSubscribe(__ -> log.info("Pooling cycle starting..."))
                    .flatMap(this::process, 2)
                    .repeat();
    
            StepVerifier.create(flux)
                    .expectNextCount(100000)
                    .verifyComplete();
        }
    
        private Flux<Integer> getSource() {
            return Flux.just(1, 2, 3, 4, 5);
        }
    
        private Mono<Integer> process(int channel) {
            return Mono.just(channel)
                    .doOnNext(rec -> log.info("Processing: {}", rec))
                    .delayElement(Duration.ofMillis(10));
        }

    The idea is to implement continuous data processing (i.e. fetching from the external resource) based on the source (getSource).

    Tried to use cache to repeat inner flow for the same source but periodically reload the source. In addition, we need to limit concurrency for the processing.

    After several iterations flow hangs indefinitely.

    Trying to troubleshoot more but it looks like a combination of cache & flatMap with concurrency causing this behaviour.

    1 reply
    Brigen Tafilica
    @BigyG_gitlab
    Flux.fromArray(DimRating.values())
                    .parallel()
                    .runOn(Schedulers.parallel())
                    .flatMap(dimRating ->
                            Flux.fromStream(repositoryReader.findByBatchId(batchId, dimRating))
                                    .publishOn(Schedulers.parallel())
                                    .map(document -> {
                                        Account account = repositoryReader.ConvertBsonDocument2Account(document);
                                        int rc = readCount.incrementAndGet();
                                        try {
                                            processor.process(account);
                                        } catch (InvalidProcessorConfigException e) {
                                            e.printStackTrace();
                                        }
                                        if (rc % 1000 == 0)
                                            log.warn("PROCESSED 1000 ACC");
                                        return repositoryReader.getUpdate(account);
                                    })
                                    .buffer(10000)
                                    .publishOn(Schedulers.parallel())
                                    .map(this::bulkWriteResult)
                                    .doOnComplete(() -> log.info("ECL Calculation Finished"))
                    )
                    .subscribe();
    
        public BulkWriteResult bulkWriteResult(List<Pair<Query, Update>> updates) {
            BulkWriteResult resultList = mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED, Account.class).updateOne(updates).execute();
            log.info("ExecuteBulkBufferedCount {} matchedCount {} modifiedCount {}", updates.size(), resultList.getMatchedCount(), resultList.getModifiedCount());
            return resultList;
        }
    THIS IS GOING TO BE A LONG READ SORRY
    On the above code we have 10 different DimRating values. What we want to do is make a DB call for each dim rating in parallel to find accounts by dim rating.
    In parallel because we proved that we can read faster from mongo if we read in parallel for each dimRating than with a singe findAll call.
    We are talking about Millions of data (20M best case) and yes there is no other way we need to read all and process all.
    Now each account need to be processed processor.process(account) and this is kind of a blocking method.
    Lets say each account needs to go through some processing and this will take 1 sec for each account.
    Then after every account is processed we need to update it in db and we thought bulk update by 10000 would be a smart thing.
    Now i know i am doing something wrong because the speed for the full read-process-update for 20M accounts is worse in reactive way ,
    than with the old school creating new threads and implementing concurrency how we used to do in imperative way.
    Now i would LOVE some help here to point me what im doing wrong and what is the beast approach for this task.
    In one of my tries since reading is faster i tried to read and directly emmit to a Sink.Many.unicast which was subscribed and the process+update was done there on doOnNext,
    But it again maybe im missing something and still it was not on the performance level i was expecting it to be
    4 replies
    architmmmec
    @architmmmec

    Hi All, Async commit is not working for us
    public Flux<ReceiverRecord<String, byte[]>> consumeFromSource() {
    ReceiverOptions<String, byte[]> receiverOptions = getReceiverOptions();
    return KafkaReceiver.create(receiverOptions).receive(Integer.parseInt(prefetch))
    .map(receiverRecord -> {
    try {
    if (receiverRecord.receiverOffset().offset() % commitBatchSize == 0) {
    receiverRecord.receiverOffset().commit();
    log.debug("Committed offset {},{}", receiverRecord.receiverOffset().topicPartition(),
    receiverRecord.receiverOffset().offset());
    }
    } catch (Exception exception) {
    log.error("Exception while committing", exception);
    }

                return receiverRecord;
            });

    }

    3 replies
    Can someone help here
    Bakulin Efim
    @efimbakulin
    Hi guys! Is it somehow possible to enrich the context for the incoming HttpRequest so that these changes are available in ContextAware httpserver metrics recorder?
    HaloFour
    @HaloFour
    What do people use for logging with Reactor/WebFlux? We're finding hidden blocking in logging frameworks that is hard to avoid entirely
    8 replies
    Oleh Dokuka
    @OlegDokuka
    image.png
    HaloFour
    @HaloFour
    I know Reactor provides its own subscriber context but it's often difficult to bridge that with context used by a lot of other frameworks that often depend on thread locals. Has there been thoughts as to how to improve the interop there? I'm currently using an operator registered via Hooks.onEachOperator but that has a noticeable impact in performance. I see Sleuth has worked around this by using Hooks.onLastOperator but notes explicitly that the reactive chain changes threads at any point any such context, like MDC, would be lost. Beyond that it seems there isn't many solutions aside using helper methods and doOnEach which I would suggest isn't a very friendly approach.
    Brigen Tafilica
    @BigyG_gitlab

    Hello guys, Im having some difficulties with reactor-kafka i hope someone can help
    Im using this code to read data from a topic and directly send it to another topic and i average 15K msgs per sec:

    KafkaReceiver
                        .create(receiverOptions)
                        .receive()
                        .map(m -> SenderRecord.create(new ProducerRecord<>(outputTopic, m.key(), m.value()), m.receiverOffset()))
                        .as(sender::send)
                        .doOnNext(m -> {
                            m.correlationMetadata().acknowledge();
                        }).subscribe()

    But the real use case is i want to process the messages then send it to another topic. So im using the below code but im averaging 5K msgs per sec:

    KafkaReceiver
                        .create(receiverOptions)
                        .receive()
                        .groupBy(m -> m.receiverOffset().topicPartition())
                        .publishOn(Schedulers.parallel())
                        .flatMap(m -> Mono.fromCallable(() -> SenderRecord.create(new ProducerRecord<>(outputTopic, m.key(), processor.process(m.value())), m.receiverOffset()))
                                .subscribeOn(Schedulers.boundedElastic())
                                .onErrorResume((err) -> {
                                    log.error(err.getMessage());
                                    return Mono.empty();
                                })
                                .as(sender::send)
                                .doOnNext(a -> {
                                    m.correlationMetadata().acknowledge();
                                }).subscribe()

    processor.process() is some blocking method that takes some milliseconds to complete but i still dont think that it should fall from 15K per sec to 5K

    Thanks in advance

    11 replies
    aconst-null
    @aconst-null
    Hi all. I've been using reactor for a while now - and am reasonably comfortable with 'the basics'. I'm wondering if anyone has any links to docs / vids etc around getting in to slightly more advanced use cases. For example: Lets say I wanted to implement something that does something similar to Flux#limitRate. Getting in to the source - this gets reasonably 'interesting' reasonably quickly (at least, for me :) ). Are there any resources about you could point me to to help 'getting in to' this next stage of Reactor usage? Thanks so much
    5 replies
    Brigen Tafilica
    @BigyG_gitlab

    Hi its me again, Sorry to bother everyone

    I am trying to read all accounts from a mongo collection (a LOT like 1Million and this is mandatory)
    than i need to do some I/O bound operations on each account (sum all values inside a big array, some other arithmetic operations like * multiplication and Math.pow) and than Update them in the mongoCollection again.
    Can someone help me with the best approach (maybe some demo code impl) to do this? Im struggling with the performance.
    Also we might want to scale verically, so go from 32Cpu cores to 64. I assume since we have I/O bound proccessing scaling vertically will not help much or am i wrong? Is there also some other impl to take also this scaling in consideration? Thanks a lot in advance

    kyussfia
    @kyussfia

    Hi all, im about to use windowuntil:

     [...]
     return super.getResult() 
                                .log("result 1")
                                .windowUntil(this::hasEnoughElementForAPage, false, MAXIMUM_FINITE_ELEMENTS)
                                .doOnError(e -> {
                                    System.err.println("do on error");
                                 })
                                .onErrorContinue((e, o) -> {
                                    System.err.println("ON ERROR RESUME");
                                    Flux.just(Flux.just(new PageItem(5 ,null,null)));
                                });

    But how can this flux be in onErrorConinue wihtout being in error state?
    I just want this flux to be in error state, because an error raised in getResult (its propagate an error signal upstream, but window unitl consumes it somehow).

    Can u suggest me any workaround for this? (The other onError* methods won't be invoked because the flux isnt in error state :( )

    Kirill Marchuk
    @62mkv
    Hi all! How to ensure that each onNext in a certain publisher (subscribed from a test method) will run on a new thread? I have exhausted my imagination while trying to achieve that
            ThreadFactory myThreadFactory = new ThreadFactoryBuilder().setDaemon(false).build();
    
            Scheduler scheduler = Schedulers.newElastic(1, myThreadFactory);
            Flux.range(1, 5)
                    .map(i -> {
                        log.info("Request #{}", i);
                        return i;
                    })
                    .publishOn(scheduler, false, 1)
                    .flatMap(i -> getResponseMono()
                            .delayElement(Duration.of(1, ChronoUnit.SECONDS))
                            .publishOn(scheduler)
                    )
                    .blockLast();
        }
    this is just the latest iteration, but believe me, I had probably already 40+ versions - all kinds of .subscribeOn + .publishOn with different Scheduler-s. no effect - the code inside getResponseMono publisher still runs on the same thread, the only difference is - which one
    in current iteration it's pool-1-thread-1
    I see in the logs, that whenever that getResponseMono is subscribed from an actual WebFlux application, it's being executed on reactor-tcp-nio threads, but how to ensure this happens in test ?
    HaloFour
    @HaloFour
    oh man, I didn't realize they were going to make Continuation internal to the JDK. I see the change happened back in August, after the last early-access release. That's disappointing, you could do some cool stuff with Continuation. I guess you can do the same on top of VirtualThread assuming they offer an API to easily yield/park and resume the fiber, or through some java.util.concurrent helper.
    8 replies
    peterspikings
    @peterspikings

    Hi.... having a problem with bufferTimeout with a slow publisher and busy downstream consumer. If the subscriber requests one then bufferTimeout requests batchSize. If it then only gets one item and the buffer times out it sends it on fine but then if it gets another from upstream before the downstream requests another list then it raises an error. I see there's a discussion/PR about similar issues with windowTimeout.

    I'm happy to do a PR for bufferTimeout, suggested fix would be to do nothing instead of raising the error and leaving the items buffered then when the next downstream request comes in if the requested amount is previously zero and there are items buffered immediately pass the buffer downstream and act as though one less item had been requested (if downstream requested more than one). Thoughts?

    2 replies
    kaladhar
    @kaladhar-mummadi
    I spend whole day trying to release dataBuffers using doOnDiscard, can someone please help me with this reactor/reactor-core#2840 ?
    Simon Baslé
    @simonbasle
    heads up: in order to reduce the number of community-driven discussion channels, we're considering archiving this gitter channel in favor of more active reactor/reactor one (and we'll keep reactor-netty for netty-specific discussions)
    Deepak Bhimaraju
    @deepak-auto
    Hi there, has the behaviour for Schedulers.onHandleError changed between reactor-core versions 3.3.5 and 3.4.12? We were on Spring Boot 2.3.x (and reactor 3.3.5) before and had a Spring Integration Test that threw an error from one of the dependencies used in the reactor chain. The test expected the Schedulers.onHandleError method to handle the error but after upgrading to Spring 2.6.1 (and reactor 3.4.12), the test has started failing. Upon debugging, I see that Hooks.onErrorDropped is receiving the error. Wanted to know your thoughts on this. Thanks!
    Will also post to reactor/reactor based on @simonbasle's comment above.
    harish vashist
    @harishvashistha1:matrix.org
    [m]
    I used webflux with springboot to get data from database and send that over sse stream. With 1 GB kubernates pod trying to serve 5 such sse streams ultimately disconnects after some time with out of memory issues and then no more fresh sse calls can be established further. It looks like memory leak as well because pod's memory utilisation does not come down after all the sse are disconnected
    Is it expected behavior of webflux that each sse flux connection to database utilities so much memory as shown in above error and then that memory never comes back to free heap area of JVM. Help would be appreciated.
    owenammann
    @owenammann
    Hello. I am wondering: when creating a Flux stream of an upstream request, how best to record latency of the upstream response into prometheus. I am attempting to use metrics() on my Flux - not sure what these metrics are named in prometheus. going off this guide: https://github.com/reactor/reactor-core/blob/main/docs/asciidoc/metrics.adoc
    2 replies
    final Flux<CacheClearResponse> concat = Flux.concat(appFlux, otherServicesFlux).metrics();
    owenammann
    @owenammann
    Ideally, would be using a histogram with buckets for percentiles
    Thanveer
    @ThanveerulHuq
    Hello, I have two flux (one from mogo changestream and the other is an Interval stream) and am listening to them by merging them, I want to cancel/complete both of them when either of them emits a value. which operator can I use to achieve this?
    my code would look something like this
    Flux<String> stream1 = reactiveMongoTemplate.changeStream(Collection.class).listen();
            Flux<String> stream2 = Flux.interval(Duration.ofMinutes(1)).map(val->"1");
            Flux.merge(stream1,stream2).subscribe();
    1 reply
    Kanai
    @kanai0618

    Hello team, i have this situation where i do
    Mono.just(Id).publishOn(Schedulers.boundedElastic).map(id -> some call)

    and as i am doing publishOn i am losing the MDC context ( like traceId etc) could you help me how can i keep the context so that i dont lose it - as i need to flow the traceID across threads

    1 reply