Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    MNK
    @KalyanMiryala_twitter
    why Schedulers.parallel() return true for Schedulers.isInNonBlockingThread() and not Schedulers.elastic() ?
    Sergei Egorov
    @bsideup
    @KalyanMiryala_twitter because elastic scheduler is usually for blocking tasks
    MNK
    @KalyanMiryala_twitter
    @bsideup thank you for the response , I am trying to understand why do we have requestOnSeparateThread flag when we have separate schedulers for blocking and non-blocking reactor calls. I have used requestOnSeparateThread in my application but couldn't see any difference on setting requestOnSeparateThread = true/false
    Fasar
    @fasar

    Hello Reactor team.

    Static composition can be done with combine() method, but I want a way to manage flux composition dynamically.
    I would like to use it for a IoT project where the UI can listen one or more sensor depending what the user want.
    The result flux is given to a web socket.
    Subscribe or unsubscribe to a sensor flux is done with a rest service.

    I would like to know if DirectProcessor is a good way to create a Flux multiplexer from others Flux ?
    I create a SimulatorModule to test how I can combine different flux in one flux with a DirectProcessor.
    It seams to work fine (in reality a sensor produce 1 data every 3 minutes - backpressure is not a pb now).

    I also tried with FluxSink from DirectProcessor#sink() instead DirectProcessor#onNext() but it seams to be a way less performant !
    What are the difference between these two way to feed the DirectProcessos ?

    I tried with EmitterProcessor, it seams to be less performance, is it the case ?
    Is EmitterProcessor,the only way to manage backpressure ?

    public class SimulatorModule {
    
        private DirectProcessor<Triple> dp;
        private HashMap<String, Disposable> producers = new HashMap<>();
    
        public SimulatorModule() {
            dp = DirectProcessor.create();
            long start = Instant.now().toEpochMilli();
        }
    
        public void simulate(String name, long interval) {
            removeSimulator(name);
            Disposable disposable = Flux.interval(Duration.ofMillis(interval))
                    .map(e -> new Triple(name, System.currentTimeMillis(), 1.0))
                    .subscribe(e -> dp.onNext(e));
            producers.put(name, disposable);
        }
    
        public void removeSimulator(String name) {
            Disposable disposable1 = producers.get(name);
            if (disposable1 != null) {
                disposable1.dispose();
                producers.remove(name);
            }
        }
    
        public Flux<Triple> flux() {
            return dp;
        }
    
        public static class Triple {
            public final String name;
            public final long ts;
            public final double value;
    
            public Triple(String name, long ts, double value) {
                this.name = name;
                this.ts = ts;
                this.value = value;
            }
        }
    }

    I call it with :

            simulatorModule.flux()
                    .filter(e -> e.name.equals("F1"))
                    .subscribe(e -> System.out.println("1 " + e.name +  " -  " +Thread.currentThread().getName())); 
            simulatorModule.flux()
                    .filter(e -> e.name.equals("F2"))
                    .subscribe(e -> System.out.println("2 " + e.name +  " -  " +Thread.currentThread().getName())); 
            simulatorModule.simulate("F1", 1000);
            simulatorModule.simulate("F2", 1200);

    Thanks in advance for your help !

    Vadim Bryksin
    @Bryksin

    Hi Everyone

    Could anyone please help me to understand how to wait (block until the result is ready) for 3 level parallel executions?
    here is a test prototype I'm working on

    https://pastebin.com/L04QQwGE
    basically each notification can have multiple receivers, where those multiple receivers should be contacted trough various broadcast media. each notification broadcast can be executed in parallel and broadcast result appended to each receiver
    (all notification data is automatically updated as it passed by ref)

    it does work with Thred.sleep(xxx) however I need to intercept it in map() and save updated collection to DB
    but it is not waiting for inner parallel task

    Vadim Bryksin
    @Bryksin
    Anyone??? :(
    Fasar
    @fasar
    @Bryksin : indeed. The data model (NotificationBody and childs) is completed in other threads. Most probably after the call of the myJpaRepo::saveAll. In fact, in reactive programing you describe what to do when something happens. The object PassByReferenceTest describes to do something to update the data model.
    If you want to go with this model you have to wait all task are done. Maybe with toFuture(). I tried for you but the complexity is too much for me. Sorry.
    @Bryksin Are you sure it is what you want : creating the datamodel with reactive stream ? I think what you need is to be warn when the datamodel is complete, is it ?
    pradoshtnair
    @pradoshtnair
    Hi, I have a ParallelFlux, want to execute an action when all the components in all the rails are consumed. I was trying to use .then(). But unable to understand how to use it. Can anybody share 1. way to get control after all the elements go through OnError,OnComplete across rail. Some sample code
    pradoshtnair
    @pradoshtnair
    Indicative code :
    RunTransformation provides a Parallel Flux in transformation, RunCompletion to do some activity at record level
    RunAction does some action for each transformed record (independent of the other).
    Here I want to run RunCompletion only on final completion, but have to do sequential I want to avoid.
    Mono.just(record) .flatMap(RunTransformation::tranformParallel) .sequential() .doOnTerminate(RunCompletion::record) .subscribe(RunAction::execute, RunError::handleError);
    Scott White
    @kibbled
    does anybody know of any reactor frameworks for SFTP, like Jsch?
    Scott White
    @kibbled
    is there a way to only retry the previous step of a sequence
    Ex:
    Mono.just(3).flatMap(Mono.error(Exception())
    the flat map part of above
    Scott White
    @kibbled
    @pradoshtnair not sure what you’re trying to accomplish but I often add a .collectList after a flux followed by a .doOnNext
    then I’ll inspect the output
    and then test it with the StepVerifier in a unit test
    yangbongsoo
    @yangbongsoo

    Hello. I have a problem. but the problem is very weired. so I can't certain that the problem is bug.
    First, I will explain the situation.
    I made l7check method. but because of blocking call issue, I wrapped using Schedulers.boundedElastic() (My reactor-core version is 3.3.4)

        @GetMapping("/monitor/l7check")
        public Mono<ResponseEntity<String>> l7check() {
    
            // https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking
            // DEFAULT_BOUNDED_ELASTIC_SIZE(maximum number of underlying threads to create) : 10 * Runtime.getRuntime().availableProcessors()
            // DEFAULT_BOUNDED_ELASTIC_QUEUESIZE : 100000
            // DEFAULT_TTL_SECONDS(Time-to-live for an idle) : 60
    
            return Mono
                .fromCallable(this::l7checkBlockingCall)
                .subscribeOn(Schedulers.boundedElastic());
        }

    when I run application server, There is no problem. but A day or two passed, and suddenly there was a problem with the l7 check.

    10.x.x.x - - [24/Jun/2020:13:42:10 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.994 1.994 "-" "-" "-"
    10.x.x.x - - [24/Jun/2020:13:48:16 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.996 1.995 "-" "-" "-"
    10.x.x.x - - [24/Jun/2020:13:51:06 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.994 1.994 "-" "-" "-"
    10.x.x.x - - [24/Jun/2020:13:52:16 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 2.006 2.006 "-" "-" "-"
    10.x.x.x - - [24/Jun/2020:13:53:06 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.995 1.994 "-" "-" "-"
    10.x.x.x - - [24/Jun/2020:13:55:06 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.995 1.995 "-" "-" "-"

    l7 Not all requests matter. However, once a problem occurs, it continues to occur. It didn't happen when the reactor-netty version was 0.9.2. When I raised it to 0.9.6 this time, I had a problem. It is too difficult to create a problem-reproducing sample. Please advise on this matter. Please tell me if there is any lack of information.

    1 reply
    FaizalKhan
    @smfaizalkhan

    Hello All,
    Howto store the value from a asynch call,so i can pass it to the next asynch function as params

    Say for instance as below

          public Mono<Account> fetchAccount(){
    Mono<Account> account = webClient.get().uri("/demo/account-partner").retrieve()
            .bodyToMono(Account.class).onErrorMap(exception -> new RuntimeException("account-partner"));
    return  account; }
    
        public Mono<Demo> fetchAccountAndPartner(String accountNo,String partnerNo){
    Mono<Demo> accountAndPartner = webClient.get()
            .uri(uriBuilder ->
                 UriComponentsBuilder.fromPath("/entity/{account-nr}/{partner-id}").build(accountNo,partnerNo)
            ).retrieve().bodyToMono(Demo.class);
       return accountAndPartner;

    }

    I would like to get the results of fetchAccount() and and pass the accountNo and parnterNo from Account to
    fetchAccountAndPartner(String accountNo,String partnerNo)

    Thanks in Advance

    Arsalan Siddiqui
    @arsalans
    Hi, can anyone tell me how to unsubscribe from a hot flux? I think since I still have subscribers, my jboss is not shutting down. Issue logged here, reactor/reactor-kafka#159.
    1 reply
    Sunny Shaw
    @sunny-shaw
    What is the correct way to do multiple calls to the same service API? The below code is how I am trying to achieve.
    Flux.fromIterable(userIds)
            .parallel()
            .runOn(Schedulers.elastic())
            .flatMap(this::getUser)
    1 reply
    John Franco
    @johncfranco
    Hello, I would like to open a pull request for the reactor-core repository. I'm new to GitHub and reactor-core, but I've signed the Pivotal CLA. What do I need to do?
    When I tried to push my branch to GitHub I got this:
    ERROR: Permission to reactor/reactor-core.git denied to johncfranco.
    fatal: Could not read from remote repository.
    
    Please make sure you have the correct access rights
    and the repository exists.
    John Franco
    @johncfranco
    Now I think I should have opened a GitHub issue first, anyway. I've opened reactor/reactor-core#2322 . If that leads to any discussion, I'll try to sort out repository permissions there.
    Violeta Georgieva
    @violetagg
    @johncfranco you need to work with a fork as you do not have permissions for reactor/reactor-core repository https://docs.github.com/en/github/getting-started-with-github/fork-a-repo
    Alexander-Nalbandyan
    @Alexander-Nalbandyan
    Can someone explain what is the difference between ConnectableFlux.autoConnect() and refCount()
    kaladhar
    @kaladhar-mummadi

    Hi can anyone help me please ?
    Pinging here after trying all afternoon and having no success .

    Mono.defer(() -> apiCallWhichReturnsMono())
            .doOnError(e -> callSomeAPI().subscribe())
            .map(someData -> continue with data)

    I don't want to do .subscribe() explicitly in doOnError because it loses the subscriberContext that is coming from downstream.
    Also I dont want to wait on callSomeAPI cause I’m not interested in the data.
    Any help is appreciated.

    9 replies
    I want to do the api call with subscriber context but not wait on the call.
    Do you guys think multi-cast is the solution here ?
    mplain
    @mplain
    speaking of, it's kinda weird that there are no doOnXX operators exposing context
    especally seeing how transformDeferredContextual gives an example of using context to log stuff
    1 reply
    kaladhar
    @kaladhar-mummadi
    Hi,
    Is there a way to skip stacktrace ( the 2nd part of log with all the reactor.core.publisher details ) from logs when using ReactorDebugAgent.init() ?
    Thanks in advance
    Mritunjay Dubey
    @mddubey
    Hi! Posted a question about reactor-rabbitmq here on stackoverflow. In case someone can help me out
    https://stackoverflow.com/questions/65068655/how-to-work-with-dead-letter-quueus-with-reactor-rabbitmq
    Bakulin Efim
    @efimbakulin
    Hey there!
    Is it OK that Mono receives a cancel signal after onComplete? What may lead to this situation?
    2 replies
    malone
    @malone081021
    is there a hook when hop thread with pushlishOn or subscribeOn? or reactor intercepting thread switch
    please take look on above Query , i am trying to use AWS read replica with Spring R2DBC
    but i am geting unknon host error
    Sunny Shaw
    @sunny-shaw
    Hi, I am not sure what is wrong with the below code snippet. Not able to get the correct output.
    Can someone point out what is wrong here?
    @Test
        fun stringBuilderWithWebFLux() {
            val builder = StringBuilder()
    
            builder.appendln("Initial")
    
            val list = listOf(1,2,3)
    
            val res = Flux.fromIterable(list)
                .flatMap {
                    apiCall(it).map { builder.appendln(it) }
                }.then(builder.toString().toMono())
    
            StepVerifier.create(res)
                .consumeNextWith {
                    print(it) //returns Intial instead of Initial \n Value 1 \n Value 2 \n Value 3
                }.verifyComplete()
        }
    private fun apiCall(it: Int?): Mono<String> {
        return "Value $it".toMono()
    }
    1 reply
    Dmitri Maximovich
    @maximdim_twitter

    Hi. I wonder if my code is correct. I have a method that produces a Flux and I need to pick one element from it, or do something if the Flux is empty (generate an error). So far I have the following:

            service.getFoo("foo")
                    .sort((Comparator.comparing(Foo:date)))
                    .next()
                    .switchIfEmpty(Mono.error(NotFoundException::new))
                    .doOnNext(System.out::println)
                    .subscribe();

    It seems to work fine but I'm seeing the following ERROR in the log output:

    2021-02-23 15:23:22 [main] ERROR [] reactor.core.publisher.Operators - Operator called default onErrorDropped
    reactor.core.Exceptions$ErrorCallbackNotImplemented: com.idexx.isc.realtime.server.session.ClientNotFoundException
    Caused by: NotFoundException: null
    ...
    2 replies
    Ivan Rodrigues
    @ivanzitojr_twitter

    Hi folks!!

    I'm new using reactor and I faced an issue.
    In the following code

              val flux = range(1, 10)
            .log()
            .doOnCancel { countDown.countDown() }
            .doOnComplete { countDown.countDown() }
            .delayElements(Duration.ofSeconds(1))
            .doOnNext {
                val state = repository.find(receipt.movementIdentification)
                logger.info("State = $state")
               if (state == "CANCELED") throw PixCanceledException("server.error", "Pix cancelled")
                if (state == "FINISHED") countDown.countDown
            }
            .map { receipt }
            .collectList()
            .map { it.component1() }
            .publishOn(Schedulers.boundedElastic())
    
    
        flux.subscribe()
        countDown.await()

    In the code above I wanna finish/cancel a subscription of a subscriber. How can I do this ?
    Looking for some questions in the stackoverflow btw I saw that isn't a motivation to do this. In my case I could throw an exception and it will work, but I wanna do the work in the right way.
    Reading the documentation, it don't say specifically how can I send a signal from a onNext step. In the documentation explain more about signals send by example when an error occur or cancel the subscription out of the producer.
    Sincerly I expect that I could express myselsf
    Tks !!

    4 replies
    gtay003
    @gtay003
    Hi all, I've recently been switching some of the code in our app that's currently using the Processor API over to using the new Sinks API instead. The new API seems much more intuitive and well thought out, thanks for all your hard work! One thing we've been hit by is the difference between a call to processor.sink() which provides a serialised sink that gates/serialises concurrent calls to onNext(), and the Sinks.many() factory methods that fail with EmitResult.FAIL_NON_SERIALIZED when multiple producers emit concurrently. I can understand the rationale for this, and found this SO answer which suggests a 'wait and retry' failure handler to work around the issue, and this seems to be working for us. My first question is: is this a reasonable way to solve this problem; and second: are there any plans to provide some additional factory methods or decorators to do this with an API call without needing a custom failure handler? I am thinking something like Sinks.many().unicast().serialised() or something similar? Thanks in advance!
    2 replies
    Mritunjay Dubey
    @mddubey

    Hello People,

    Going to be a long read; :(

    Working with reactor on web-flux and having some performance issues when we load test with a lot of concurrent users(15-20K). We have two basic questions and possibly could help us improve the performance of our systems

    • Any specific reason CoreSubscriber methods are void and not Mono<Void>? Does that mean whatever code we will write here will be blocking? We are using slf4j-MDC to pass a trace-id into all the logs. For this we have similar approach as mentioned here. The approach is to implement a core-subscriber where we will copy the traceid to the MDC from the current context. We see a lot of CPU time is spent in this method itself. Please let us know if we are doing something wrong.

    • How does running tasks on scheduler helps if we have a single CPU? In our case we are running 10 kubernetes containers with each "1000m" CPU, which is more like a single core machine(if I understand correctly). In this case it does not matter if we run some stuff on scheduler because at the end there is a single core and nothing can be scheduled concurrently. To test my above theory I doubled the CPU in each container(2000m) and reduced the numbers of containers to 5 but there was not much difference. Theocratically, it should reduce the response time but there was not much difference.

    Please let us know if there are any pointers on these.

    Jeevjyot Singh Chhabda
    @jeevjyot
    Hi Folks,
    Trying to use FluxSink and not very sure I understand onRequest (https://projectreactor.io/docs/core/release/api/reactor/core/publisher/FluxSink.html#onRequest-java.util.function.LongConsumer-) method. Could anyone please shed some light hot is request count is determined?
    HaloFour
    @HaloFour
    Good morning! I was wondering if anyone knew of good tools that might be used to detect if a Reactor publisher is not being used? It's a common issue in our Spring WebFlux projects that newly onboarded developers assume that they can drop a Mono<Void> on the floor as they could with a void-returning method and if possible I'd like to catch that during compilation.
    3 replies
    Archit Agarwal
    @architmmmec_twitter
    Hi Team, I am using receiverOffset.commit() in my kafka consumer(Asynchronous way). Can anyone tell approx how much time does it take to commit the offsets. I waited for 3 min still offsets were not committed. Can someone help on this
    Aleksey Vasin
    @youagree
    hello, how i can pass request attributes into rest-controller by WebFilter? now i use contextWrite to put key/value, and get this in rest-controller by deferContextual and it does not work
    Alex Mrynsky
    @amrynsky

    Just discovered unexpected behavior of Flux.cache that causing flow to hang indefinitely.

    @Test
        void flow() {
            var flux = getSource()
                    .doOnSubscribe(__ -> log.info("Loading source..."))
                    .cache(Duration.ofSeconds(2))
                    .doOnSubscribe(__ -> log.info("Pooling cycle starting..."))
                    .flatMap(this::process, 2)
                    .repeat();
    
            StepVerifier.create(flux)
                    .expectNextCount(100000)
                    .verifyComplete();
        }
    
        private Flux<Integer> getSource() {
            return Flux.just(1, 2, 3, 4, 5);
        }
    
        private Mono<Integer> process(int channel) {
            return Mono.just(channel)
                    .doOnNext(rec -> log.info("Processing: {}", rec))
                    .delayElement(Duration.ofMillis(10));
        }

    The idea is to implement continuous data processing (i.e. fetching from the external resource) based on the source (getSource).

    Tried to use cache to repeat inner flow for the same source but periodically reload the source. In addition, we need to limit concurrency for the processing.

    After several iterations flow hangs indefinitely.

    Trying to troubleshoot more but it looks like a combination of cache & flatMap with concurrency causing this behaviour.

    1 reply
    Brigen Tafilica
    @BigyG_gitlab
    Flux.fromArray(DimRating.values())
                    .parallel()
                    .runOn(Schedulers.parallel())
                    .flatMap(dimRating ->
                            Flux.fromStream(repositoryReader.findByBatchId(batchId, dimRating))
                                    .publishOn(Schedulers.parallel())
                                    .map(document -> {
                                        Account account = repositoryReader.ConvertBsonDocument2Account(document);
                                        int rc = readCount.incrementAndGet();
                                        try {
                                            processor.process(account);
                                        } catch (InvalidProcessorConfigException e) {
                                            e.printStackTrace();
                                        }
                                        if (rc % 1000 == 0)
                                            log.warn("PROCESSED 1000 ACC");
                                        return repositoryReader.getUpdate(account);
                                    })
                                    .buffer(10000)
                                    .publishOn(Schedulers.parallel())
                                    .map(this::bulkWriteResult)
                                    .doOnComplete(() -> log.info("ECL Calculation Finished"))
                    )
                    .subscribe();
    
        public BulkWriteResult bulkWriteResult(List<Pair<Query, Update>> updates) {
            BulkWriteResult resultList = mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED, Account.class).updateOne(updates).execute();
            log.info("ExecuteBulkBufferedCount {} matchedCount {} modifiedCount {}", updates.size(), resultList.getMatchedCount(), resultList.getModifiedCount());
            return resultList;
        }
    THIS IS GOING TO BE A LONG READ SORRY
    On the above code we have 10 different DimRating values. What we want to do is make a DB call for each dim rating in parallel to find accounts by dim rating.
    In parallel because we proved that we can read faster from mongo if we read in parallel for each dimRating than with a singe findAll call.
    We are talking about Millions of data (20M best case) and yes there is no other way we need to read all and process all.
    Now each account need to be processed processor.process(account) and this is kind of a blocking method.
    Lets say each account needs to go through some processing and this will take 1 sec for each account.
    Then after every account is processed we need to update it in db and we thought bulk update by 10000 would be a smart thing.
    Now i know i am doing something wrong because the speed for the full read-process-update for 20M accounts is worse in reactive way ,
    than with the old school creating new threads and implementing concurrency how we used to do in imperative way.
    Now i would LOVE some help here to point me what im doing wrong and what is the beast approach for this task.
    In one of my tries since reading is faster i tried to read and directly emmit to a Sink.Many.unicast which was subscribed and the process+update was done there on doOnNext,
    But it again maybe im missing something and still it was not on the performance level i was expecting it to be
    4 replies