Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    pradoshtnair
    @pradoshtnair
    Indicative code :
    RunTransformation provides a Parallel Flux in transformation, RunCompletion to do some activity at record level
    RunAction does some action for each transformed record (independent of the other).
    Here I want to run RunCompletion only on final completion, but have to do sequential I want to avoid.
    Mono.just(record) .flatMap(RunTransformation::tranformParallel) .sequential() .doOnTerminate(RunCompletion::record) .subscribe(RunAction::execute, RunError::handleError);
    Scott White
    @kibbled
    does anybody know of any reactor frameworks for SFTP, like Jsch?
    Scott White
    @kibbled
    is there a way to only retry the previous step of a sequence
    Ex:
    Mono.just(3).flatMap(Mono.error(Exception())
    the flat map part of above
    Scott White
    @kibbled
    @pradoshtnair not sure what you’re trying to accomplish but I often add a .collectList after a flux followed by a .doOnNext
    then I’ll inspect the output
    and then test it with the StepVerifier in a unit test
    yangbongsoo
    @yangbongsoo

    Hello. I have a problem. but the problem is very weired. so I can't certain that the problem is bug.
    First, I will explain the situation.
    I made l7check method. but because of blocking call issue, I wrapped using Schedulers.boundedElastic() (My reactor-core version is 3.3.4)

        @GetMapping("/monitor/l7check")
        public Mono<ResponseEntity<String>> l7check() {
    
            // https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking
            // DEFAULT_BOUNDED_ELASTIC_SIZE(maximum number of underlying threads to create) : 10 * Runtime.getRuntime().availableProcessors()
            // DEFAULT_BOUNDED_ELASTIC_QUEUESIZE : 100000
            // DEFAULT_TTL_SECONDS(Time-to-live for an idle) : 60
    
            return Mono
                .fromCallable(this::l7checkBlockingCall)
                .subscribeOn(Schedulers.boundedElastic());
        }

    when I run application server, There is no problem. but A day or two passed, and suddenly there was a problem with the l7 check.

    10.x.x.x - - [24/Jun/2020:13:42:10 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.994 1.994 "-" "-" "-"
    10.x.x.x - - [24/Jun/2020:13:48:16 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.996 1.995 "-" "-" "-"
    10.x.x.x - - [24/Jun/2020:13:51:06 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.994 1.994 "-" "-" "-"
    10.x.x.x - - [24/Jun/2020:13:52:16 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 2.006 2.006 "-" "-" "-"
    10.x.x.x - - [24/Jun/2020:13:53:06 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.995 1.994 "-" "-" "-"
    10.x.x.x - - [24/Jun/2020:13:55:06 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.995 1.995 "-" "-" "-"

    l7 Not all requests matter. However, once a problem occurs, it continues to occur. It didn't happen when the reactor-netty version was 0.9.2. When I raised it to 0.9.6 this time, I had a problem. It is too difficult to create a problem-reproducing sample. Please advise on this matter. Please tell me if there is any lack of information.

    1 reply
    FaizalKhan
    @smfaizalkhan

    Hello All,
    Howto store the value from a asynch call,so i can pass it to the next asynch function as params

    Say for instance as below

          public Mono<Account> fetchAccount(){
    Mono<Account> account = webClient.get().uri("/demo/account-partner").retrieve()
            .bodyToMono(Account.class).onErrorMap(exception -> new RuntimeException("account-partner"));
    return  account; }
    
        public Mono<Demo> fetchAccountAndPartner(String accountNo,String partnerNo){
    Mono<Demo> accountAndPartner = webClient.get()
            .uri(uriBuilder ->
                 UriComponentsBuilder.fromPath("/entity/{account-nr}/{partner-id}").build(accountNo,partnerNo)
            ).retrieve().bodyToMono(Demo.class);
       return accountAndPartner;

    }

    I would like to get the results of fetchAccount() and and pass the accountNo and parnterNo from Account to
    fetchAccountAndPartner(String accountNo,String partnerNo)

    Thanks in Advance

    Arsalan Siddiqui
    @arsalans
    Hi, can anyone tell me how to unsubscribe from a hot flux? I think since I still have subscribers, my jboss is not shutting down. Issue logged here, reactor/reactor-kafka#159.
    1 reply
    Sunny Shaw
    @sunny-shaw
    What is the correct way to do multiple calls to the same service API? The below code is how I am trying to achieve.
    Flux.fromIterable(userIds)
            .parallel()
            .runOn(Schedulers.elastic())
            .flatMap(this::getUser)
    1 reply
    John Franco
    @johncfranco
    Hello, I would like to open a pull request for the reactor-core repository. I'm new to GitHub and reactor-core, but I've signed the Pivotal CLA. What do I need to do?
    When I tried to push my branch to GitHub I got this:
    ERROR: Permission to reactor/reactor-core.git denied to johncfranco.
    fatal: Could not read from remote repository.
    
    Please make sure you have the correct access rights
    and the repository exists.
    John Franco
    @johncfranco
    Now I think I should have opened a GitHub issue first, anyway. I've opened reactor/reactor-core#2322 . If that leads to any discussion, I'll try to sort out repository permissions there.
    Violeta Georgieva
    @violetagg
    @johncfranco you need to work with a fork as you do not have permissions for reactor/reactor-core repository https://docs.github.com/en/github/getting-started-with-github/fork-a-repo
    Alexander-Nalbandyan
    @Alexander-Nalbandyan
    Can someone explain what is the difference between ConnectableFlux.autoConnect() and refCount()
    kaladhar
    @kaladhar-mummadi

    Hi can anyone help me please ?
    Pinging here after trying all afternoon and having no success .

    Mono.defer(() -> apiCallWhichReturnsMono())
            .doOnError(e -> callSomeAPI().subscribe())
            .map(someData -> continue with data)

    I don't want to do .subscribe() explicitly in doOnError because it loses the subscriberContext that is coming from downstream.
    Also I dont want to wait on callSomeAPI cause I’m not interested in the data.
    Any help is appreciated.

    9 replies
    I want to do the api call with subscriber context but not wait on the call.
    Do you guys think multi-cast is the solution here ?
    mplain
    @mplain
    speaking of, it's kinda weird that there are no doOnXX operators exposing context
    especally seeing how transformDeferredContextual gives an example of using context to log stuff
    1 reply
    kaladhar
    @kaladhar-mummadi
    Hi,
    Is there a way to skip stacktrace ( the 2nd part of log with all the reactor.core.publisher details ) from logs when using ReactorDebugAgent.init() ?
    Thanks in advance
    Mritunjay Dubey
    @mddubey
    Hi! Posted a question about reactor-rabbitmq here on stackoverflow. In case someone can help me out
    https://stackoverflow.com/questions/65068655/how-to-work-with-dead-letter-quueus-with-reactor-rabbitmq
    Bakulin Efim
    @efimbakulin
    Hey there!
    Is it OK that Mono receives a cancel signal after onComplete? What may lead to this situation?
    2 replies
    malone
    @malone081021
    is there a hook when hop thread with pushlishOn or subscribeOn? or reactor intercepting thread switch
    please take look on above Query , i am trying to use AWS read replica with Spring R2DBC
    but i am geting unknon host error
    Sunny Shaw
    @sunny-shaw
    Hi, I am not sure what is wrong with the below code snippet. Not able to get the correct output.
    Can someone point out what is wrong here?
    @Test
        fun stringBuilderWithWebFLux() {
            val builder = StringBuilder()
    
            builder.appendln("Initial")
    
            val list = listOf(1,2,3)
    
            val res = Flux.fromIterable(list)
                .flatMap {
                    apiCall(it).map { builder.appendln(it) }
                }.then(builder.toString().toMono())
    
            StepVerifier.create(res)
                .consumeNextWith {
                    print(it) //returns Intial instead of Initial \n Value 1 \n Value 2 \n Value 3
                }.verifyComplete()
        }
    private fun apiCall(it: Int?): Mono<String> {
        return "Value $it".toMono()
    }
    1 reply
    Dmitri Maximovich
    @maximdim_twitter

    Hi. I wonder if my code is correct. I have a method that produces a Flux and I need to pick one element from it, or do something if the Flux is empty (generate an error). So far I have the following:

            service.getFoo("foo")
                    .sort((Comparator.comparing(Foo:date)))
                    .next()
                    .switchIfEmpty(Mono.error(NotFoundException::new))
                    .doOnNext(System.out::println)
                    .subscribe();

    It seems to work fine but I'm seeing the following ERROR in the log output:

    2021-02-23 15:23:22 [main] ERROR [] reactor.core.publisher.Operators - Operator called default onErrorDropped
    reactor.core.Exceptions$ErrorCallbackNotImplemented: com.idexx.isc.realtime.server.session.ClientNotFoundException
    Caused by: NotFoundException: null
    ...