Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
    @Bryksin Are you sure it is what you want : creating the datamodel with reactive stream ? I think what you need is to be warn when the datamodel is complete, is it ?
    Hi, I have a ParallelFlux, want to execute an action when all the components in all the rails are consumed. I was trying to use .then(). But unable to understand how to use it. Can anybody share 1. way to get control after all the elements go through OnError,OnComplete across rail. Some sample code
    Indicative code :
    RunTransformation provides a Parallel Flux in transformation, RunCompletion to do some activity at record level
    RunAction does some action for each transformed record (independent of the other).
    Here I want to run RunCompletion only on final completion, but have to do sequential I want to avoid.
    Mono.just(record) .flatMap(RunTransformation::tranformParallel) .sequential() .doOnTerminate(RunCompletion::record) .subscribe(RunAction::execute, RunError::handleError);
    Scott White
    does anybody know of any reactor frameworks for SFTP, like Jsch?
    Scott White
    is there a way to only retry the previous step of a sequence
    the flat map part of above
    Scott White
    @pradoshtnair not sure what you’re trying to accomplish but I often add a .collectList after a flux followed by a .doOnNext
    then I’ll inspect the output
    and then test it with the StepVerifier in a unit test

    Hello. I have a problem. but the problem is very weired. so I can't certain that the problem is bug.
    First, I will explain the situation.
    I made l7check method. but because of blocking call issue, I wrapped using Schedulers.boundedElastic() (My reactor-core version is 3.3.4)

        public Mono<ResponseEntity<String>> l7check() {
            // https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking
            // DEFAULT_BOUNDED_ELASTIC_SIZE(maximum number of underlying threads to create) : 10 * Runtime.getRuntime().availableProcessors()
            // DEFAULT_TTL_SECONDS(Time-to-live for an idle) : 60
            return Mono

    when I run application server, There is no problem. but A day or two passed, and suddenly there was a problem with the l7 check.

    10.x.x.x - - [24/Jun/2020:13:42:10 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.994 1.994 "-" "-" "-"
    10.x.x.x - - [24/Jun/2020:13:48:16 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.996 1.995 "-" "-" "-"
    10.x.x.x - - [24/Jun/2020:13:51:06 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.994 1.994 "-" "-" "-"
    10.x.x.x - - [24/Jun/2020:13:52:16 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 2.006 2.006 "-" "-" "-"
    10.x.x.x - - [24/Jun/2020:13:53:06 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.995 1.994 "-" "-" "-"
    10.x.x.x - - [24/Jun/2020:13:55:06 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.995 1.995 "-" "-" "-"

    l7 Not all requests matter. However, once a problem occurs, it continues to occur. It didn't happen when the reactor-netty version was 0.9.2. When I raised it to 0.9.6 this time, I had a problem. It is too difficult to create a problem-reproducing sample. Please advise on this matter. Please tell me if there is any lack of information.

    1 reply

    Hello All,
    Howto store the value from a asynch call,so i can pass it to the next asynch function as params

    Say for instance as below

          public Mono<Account> fetchAccount(){
    Mono<Account> account = webClient.get().uri("/demo/account-partner").retrieve()
            .bodyToMono(Account.class).onErrorMap(exception -> new RuntimeException("account-partner"));
    return  account; }
        public Mono<Demo> fetchAccountAndPartner(String accountNo,String partnerNo){
    Mono<Demo> accountAndPartner = webClient.get()
            .uri(uriBuilder ->
       return accountAndPartner;


    I would like to get the results of fetchAccount() and and pass the accountNo and parnterNo from Account to
    fetchAccountAndPartner(String accountNo,String partnerNo)

    Thanks in Advance

    Arsalan Siddiqui
    Hi, can anyone tell me how to unsubscribe from a hot flux? I think since I still have subscribers, my jboss is not shutting down. Issue logged here, reactor/reactor-kafka#159.
    1 reply
    Sunny Shaw
    What is the correct way to do multiple calls to the same service API? The below code is how I am trying to achieve.
    1 reply
    John Franco
    Hello, I would like to open a pull request for the reactor-core repository. I'm new to GitHub and reactor-core, but I've signed the Pivotal CLA. What do I need to do?
    When I tried to push my branch to GitHub I got this:
    ERROR: Permission to reactor/reactor-core.git denied to johncfranco.
    fatal: Could not read from remote repository.
    Please make sure you have the correct access rights
    and the repository exists.
    John Franco
    Now I think I should have opened a GitHub issue first, anyway. I've opened reactor/reactor-core#2322 . If that leads to any discussion, I'll try to sort out repository permissions there.
    Violeta Georgieva
    @johncfranco you need to work with a fork as you do not have permissions for reactor/reactor-core repository https://docs.github.com/en/github/getting-started-with-github/fork-a-repo
    Can someone explain what is the difference between ConnectableFlux.autoConnect() and refCount()

    Hi can anyone help me please ?
    Pinging here after trying all afternoon and having no success .

    Mono.defer(() -> apiCallWhichReturnsMono())
            .doOnError(e -> callSomeAPI().subscribe())
            .map(someData -> continue with data)

    I don't want to do .subscribe() explicitly in doOnError because it loses the subscriberContext that is coming from downstream.
    Also I dont want to wait on callSomeAPI cause I’m not interested in the data.
    Any help is appreciated.

    9 replies
    I want to do the api call with subscriber context but not wait on the call.
    Do you guys think multi-cast is the solution here ?
    speaking of, it's kinda weird that there are no doOnXX operators exposing context
    especally seeing how transformDeferredContextual gives an example of using context to log stuff
    1 reply
    Is there a way to skip stacktrace ( the 2nd part of log with all the reactor.core.publisher details ) from logs when using ReactorDebugAgent.init() ?
    Thanks in advance
    Mritunjay Dubey
    Hi! Posted a question about reactor-rabbitmq here on stackoverflow. In case someone can help me out
    Bakulin Efim
    Hey there!
    Is it OK that Mono receives a cancel signal after onComplete? What may lead to this situation?
    2 replies
    is there a hook when hop thread with pushlishOn or subscribeOn? or reactor intercepting thread switch
    please take look on above Query , i am trying to use AWS read replica with Spring R2DBC
    but i am geting unknon host error
    Sunny Shaw
    Hi, I am not sure what is wrong with the below code snippet. Not able to get the correct output.
    Can someone point out what is wrong here?
        fun stringBuilderWithWebFLux() {
            val builder = StringBuilder()
            val list = listOf(1,2,3)
            val res = Flux.fromIterable(list)
                .flatMap {
                    apiCall(it).map { builder.appendln(it) }
                .consumeNextWith {
                    print(it) //returns Intial instead of Initial \n Value 1 \n Value 2 \n Value 3
    private fun apiCall(it: Int?): Mono<String> {
        return "Value $it".toMono()
    1 reply
    Dmitri Maximovich

    Hi. I wonder if my code is correct. I have a method that produces a Flux and I need to pick one element from it, or do something if the Flux is empty (generate an error). So far I have the following:


    It seems to work fine but I'm seeing the following ERROR in the log output:

    2021-02-23 15:23:22 [main] ERROR [] reactor.core.publisher.Operators - Operator called default onErrorDropped
    reactor.core.Exceptions$ErrorCallbackNotImplemented: com.idexx.isc.realtime.server.session.ClientNotFoundException
    Caused by: NotFoundException: null