Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    kota.sunilkumar
    @kota_sunilkumar_twitter
    @bsideup @simonbasle
    Sergei Egorov
    @bsideup
    @kota_sunilkumar_twitter please do not post in both reactor/reactor, reactor/reactor-core is enough
    kota.sunilkumar
    @kota_sunilkumar_twitter
    Ok...will not repeat that...my mistake...thanks for correcting me
    @bsideup, any suggestions on my above puzzle?
    Sergei Egorov
    @bsideup
    @kota_sunilkumar_twitter this does not really seem to be Reactor-related, since you're running into db connection exceptions
    make sure that you actually use the pool and not create a new connection every time
    blake-bauman
    @blake-bauman
    I'm looking to take a list of Mono<Void> instances and execute them all in parallel. Would Flux.concat(...).parallel().subscribe() do the trick? Or would I need to call subscribe() on each of the Monos individually with different Schedulers?
    Sumit Dhaniya
    @SumitDhaniya_twitter
    I’m creating a hot flux which publishes parallel scheduler. A subscriber which is consuming this flux is running in parallel on a new bounded elastic scheduler. While doing some testing I set the thread cap and queue cap to 1 and program started throwing error which baffled me at first but after closely going through the documentation I figured out it’s because when no more task can be enqueued
    Now I’m little confused because of I increase the queue task cap to 4-5 it works again but it’s queue should have been overflown in this case too as it’s processing 100’s of elements
    kota.sunilkumar
    @kota_sunilkumar_twitter
    @bsideup @simonbasle , is calling doAfterTerminate on filter(exchange) work asynchronously ?
    (like filter(exchange). doAfterTerminate ()) if not , how can make it Asynchronous?
    M3yo
    @M3yo
    Hi everyone
    I'm kinda new to reactor, I've read the basis and all but i'm a bit stuck on my implementation, I need some help
    I've created an endpoint, which wraps my business endpoint with data model transformation. My new endpoint call 3 methods which returns a Mono<myobject>. The first one transform "outsideModel" to "internalModel" (through a microserviceA), then I call my business microserviceB, finally I call-again my first microserviceA to transform back from internalModel to outsideModel. As my microserviceA is stateless, I need the original data from the "outsideModel" from step 1. My 3 Mono calls are chained, and I'm troubled on how can I store/save the original "outsideModel" and reuse it when I need it for my last step. Can I simply store it in a local variable ?
    atarutin
    @atarutin
    hello
    atarutin
    @atarutin
    i'm a bit lost... i've a list of objects (id, parentId, children) and initially only id and parentId are populated. based on id and parentId, i need to make them into a hierarchy and populate children accordingly... poking around but no solution yet... any hints?
    Alvaro Sevilla
    @alvarosevilla95
    Hi! Hope this is the right place to ask, would really appreacieate any input.
    I have a flux instance, that I'm iterating through using toIterable. I can't find a way to cancel the underlaying generator if anything goes wrong on the consumer side, it will only close once I've iterated through enough items to exhaust it. Is there any way to trigger a cancelation event to a flux that is being consumed through toIterable()?
    Scott White
    @kibbled
    Any thoughts on reactor vs ktor specifically the http client?
    Hongde Liu
    @enginespot

    question about 'request' method

     final Flux<Boolean> randoms = Flux.generate(sink -> sink.next(true));
    randoms.filter(x -> x == true).subscribe(x ->System.out.println(x));

    as I know , for the above code , LambdaSubscriber will request data from FilterFuseableSubscriber , just like downstream(LambdaSubscriber) request data from upstream(FilterFuseableSubscriber) , and for FilterFuseableSubscriber it will request data from upstream, my question is :

    @Override
        public void request(long n) {
                s.request(n);
        }

    why the request number is the same as the downstream, if downstream request data number is 5 , for filter function , the requestion should more than 5 , since that it will filter some data , so can somebody tell me the logic :)

    Kirill Marchuk
    @62mkv
    Hi. I would like to trigger a reactive action when some particular error occurs upstream and then return the original error. Wanted to use .onErrorMap but it says it's for "synchronous" functions only
    .onErrorResume then ?
    Sergei Egorov
    @bsideup
    yes, onErrorResume allows you to handle the error asynchronously
    MNK
    @KalyanMiryala_twitter
    why Schedulers.parallel() return true for Schedulers.isInNonBlockingThread() and not Schedulers.elastic() ?
    Sergei Egorov
    @bsideup
    @KalyanMiryala_twitter because elastic scheduler is usually for blocking tasks
    MNK
    @KalyanMiryala_twitter
    @bsideup thank you for the response , I am trying to understand why do we have requestOnSeparateThread flag when we have separate schedulers for blocking and non-blocking reactor calls. I have used requestOnSeparateThread in my application but couldn't see any difference on setting requestOnSeparateThread = true/false
    Fasar
    @fasar

    Hello Reactor team.

    Static composition can be done with combine() method, but I want a way to manage flux composition dynamically.
    I would like to use it for a IoT project where the UI can listen one or more sensor depending what the user want.
    The result flux is given to a web socket.
    Subscribe or unsubscribe to a sensor flux is done with a rest service.

    I would like to know if DirectProcessor is a good way to create a Flux multiplexer from others Flux ?
    I create a SimulatorModule to test how I can combine different flux in one flux with a DirectProcessor.
    It seams to work fine (in reality a sensor produce 1 data every 3 minutes - backpressure is not a pb now).

    I also tried with FluxSink from DirectProcessor#sink() instead DirectProcessor#onNext() but it seams to be a way less performant !
    What are the difference between these two way to feed the DirectProcessos ?

    I tried with EmitterProcessor, it seams to be less performance, is it the case ?
    Is EmitterProcessor,the only way to manage backpressure ?

    public class SimulatorModule {
    
        private DirectProcessor<Triple> dp;
        private HashMap<String, Disposable> producers = new HashMap<>();
    
        public SimulatorModule() {
            dp = DirectProcessor.create();
            long start = Instant.now().toEpochMilli();
        }
    
        public void simulate(String name, long interval) {
            removeSimulator(name);
            Disposable disposable = Flux.interval(Duration.ofMillis(interval))
                    .map(e -> new Triple(name, System.currentTimeMillis(), 1.0))
                    .subscribe(e -> dp.onNext(e));
            producers.put(name, disposable);
        }
    
        public void removeSimulator(String name) {
            Disposable disposable1 = producers.get(name);
            if (disposable1 != null) {
                disposable1.dispose();
                producers.remove(name);
            }
        }
    
        public Flux<Triple> flux() {
            return dp;
        }
    
        public static class Triple {
            public final String name;
            public final long ts;
            public final double value;
    
            public Triple(String name, long ts, double value) {
                this.name = name;
                this.ts = ts;
                this.value = value;
            }
        }
    }

    I call it with :

            simulatorModule.flux()
                    .filter(e -> e.name.equals("F1"))
                    .subscribe(e -> System.out.println("1 " + e.name +  " -  " +Thread.currentThread().getName())); 
            simulatorModule.flux()
                    .filter(e -> e.name.equals("F2"))
                    .subscribe(e -> System.out.println("2 " + e.name +  " -  " +Thread.currentThread().getName())); 
            simulatorModule.simulate("F1", 1000);
            simulatorModule.simulate("F2", 1200);

    Thanks in advance for your help !

    Vadim Bryksin
    @Bryksin

    Hi Everyone

    Could anyone please help me to understand how to wait (block until the result is ready) for 3 level parallel executions?
    here is a test prototype I'm working on

    https://pastebin.com/L04QQwGE
    basically each notification can have multiple receivers, where those multiple receivers should be contacted trough various broadcast media. each notification broadcast can be executed in parallel and broadcast result appended to each receiver
    (all notification data is automatically updated as it passed by ref)

    it does work with Thred.sleep(xxx) however I need to intercept it in map() and save updated collection to DB
    but it is not waiting for inner parallel task

    Vadim Bryksin
    @Bryksin
    Anyone??? :(
    Fasar
    @fasar
    @Bryksin : indeed. The data model (NotificationBody and childs) is completed in other threads. Most probably after the call of the myJpaRepo::saveAll. In fact, in reactive programing you describe what to do when something happens. The object PassByReferenceTest describes to do something to update the data model.
    If you want to go with this model you have to wait all task are done. Maybe with toFuture(). I tried for you but the complexity is too much for me. Sorry.
    @Bryksin Are you sure it is what you want : creating the datamodel with reactive stream ? I think what you need is to be warn when the datamodel is complete, is it ?
    pradoshtnair
    @pradoshtnair
    Hi, I have a ParallelFlux, want to execute an action when all the components in all the rails are consumed. I was trying to use .then(). But unable to understand how to use it. Can anybody share 1. way to get control after all the elements go through OnError,OnComplete across rail. Some sample code
    pradoshtnair
    @pradoshtnair
    Indicative code :
    RunTransformation provides a Parallel Flux in transformation, RunCompletion to do some activity at record level
    RunAction does some action for each transformed record (independent of the other).
    Here I want to run RunCompletion only on final completion, but have to do sequential I want to avoid.
    Mono.just(record) .flatMap(RunTransformation::tranformParallel) .sequential() .doOnTerminate(RunCompletion::record) .subscribe(RunAction::execute, RunError::handleError);
    Scott White
    @kibbled
    does anybody know of any reactor frameworks for SFTP, like Jsch?
    Scott White
    @kibbled
    is there a way to only retry the previous step of a sequence
    Ex:
    Mono.just(3).flatMap(Mono.error(Exception())
    the flat map part of above
    Scott White
    @kibbled
    @pradoshtnair not sure what you’re trying to accomplish but I often add a .collectList after a flux followed by a .doOnNext
    then I’ll inspect the output
    and then test it with the StepVerifier in a unit test
    yangbongsoo
    @yangbongsoo

    Hello. I have a problem. but the problem is very weired. so I can't certain that the problem is bug.
    First, I will explain the situation.
    I made l7check method. but because of blocking call issue, I wrapped using Schedulers.boundedElastic() (My reactor-core version is 3.3.4)

        @GetMapping("/monitor/l7check")
        public Mono<ResponseEntity<String>> l7check() {
    
            // https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking
            // DEFAULT_BOUNDED_ELASTIC_SIZE(maximum number of underlying threads to create) : 10 * Runtime.getRuntime().availableProcessors()
            // DEFAULT_BOUNDED_ELASTIC_QUEUESIZE : 100000
            // DEFAULT_TTL_SECONDS(Time-to-live for an idle) : 60
    
            return Mono
                .fromCallable(this::l7checkBlockingCall)
                .subscribeOn(Schedulers.boundedElastic());
        }

    when I run application server, There is no problem. but A day or two passed, and suddenly there was a problem with the l7 check.

    10.x.x.x - - [24/Jun/2020:13:42:10 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.994 1.994 "-" "-" "-"
    10.x.x.x - - [24/Jun/2020:13:48:16 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.996 1.995 "-" "-" "-"
    10.x.x.x - - [24/Jun/2020:13:51:06 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.994 1.994 "-" "-" "-"
    10.x.x.x - - [24/Jun/2020:13:52:16 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 2.006 2.006 "-" "-" "-"
    10.x.x.x - - [24/Jun/2020:13:53:06 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.995 1.994 "-" "-" "-"
    10.x.x.x - - [24/Jun/2020:13:55:06 +0900] "10.x.x.x" "GET /monitor/l7check HTTP/1.1" 499 0 1.995 1.995 "-" "-" "-"

    l7 Not all requests matter. However, once a problem occurs, it continues to occur. It didn't happen when the reactor-netty version was 0.9.2. When I raised it to 0.9.6 this time, I had a problem. It is too difficult to create a problem-reproducing sample. Please advise on this matter. Please tell me if there is any lack of information.

    1 reply
    FaizalKhan
    @smfaizalkhan

    Hello All,
    Howto store the value from a asynch call,so i can pass it to the next asynch function as params

    Say for instance as below

          public Mono<Account> fetchAccount(){
    Mono<Account> account = webClient.get().uri("/demo/account-partner").retrieve()
            .bodyToMono(Account.class).onErrorMap(exception -> new RuntimeException("account-partner"));
    return  account; }
    
        public Mono<Demo> fetchAccountAndPartner(String accountNo,String partnerNo){
    Mono<Demo> accountAndPartner = webClient.get()
            .uri(uriBuilder ->
                 UriComponentsBuilder.fromPath("/entity/{account-nr}/{partner-id}").build(accountNo,partnerNo)
            ).retrieve().bodyToMono(Demo.class);
       return accountAndPartner;

    }

    I would like to get the results of fetchAccount() and and pass the accountNo and parnterNo from Account to
    fetchAccountAndPartner(String accountNo,String partnerNo)

    Thanks in Advance

    Arsalan Siddiqui
    @arsalans
    Hi, can anyone tell me how to unsubscribe from a hot flux? I think since I still have subscribers, my jboss is not shutting down. Issue logged here, reactor/reactor-kafka#159.
    1 reply
    Sunny Shaw
    @sunny-shaw
    What is the correct way to do multiple calls to the same service API? The below code is how I am trying to achieve.
    Flux.fromIterable(userIds)
            .parallel()
            .runOn(Schedulers.elastic())
            .flatMap(this::getUser)
    1 reply
    John Franco
    @johncfranco
    Hello, I would like to open a pull request for the reactor-core repository. I'm new to GitHub and reactor-core, but I've signed the Pivotal CLA. What do I need to do?
    When I tried to push my branch to GitHub I got this:
    ERROR: Permission to reactor/reactor-core.git denied to johncfranco.
    fatal: Could not read from remote repository.
    
    Please make sure you have the correct access rights
    and the repository exists.
    John Franco
    @johncfranco
    Now I think I should have opened a GitHub issue first, anyway. I've opened reactor/reactor-core#2322 . If that leads to any discussion, I'll try to sort out repository permissions there.
    Violeta Georgieva
    @violetagg
    @johncfranco you need to work with a fork as you do not have permissions for reactor/reactor-core repository https://docs.github.com/en/github/getting-started-with-github/fork-a-repo
    Alexander-Nalbandyan
    @Alexander-Nalbandyan
    Can someone explain what is the difference between ConnectableFlux.autoConnect() and refCount()
    kaladhar
    @kaladhar-mummadi

    Hi can anyone help me please ?
    Pinging here after trying all afternoon and having no success .

    Mono.defer(() -> apiCallWhichReturnsMono())
            .doOnError(e -> callSomeAPI().subscribe())
            .map(someData -> continue with data)

    I don't want to do .subscribe() explicitly in doOnError because it loses the subscriberContext that is coming from downstream.
    Also I dont want to wait on callSomeAPI cause I’m not interested in the data.
    Any help is appreciated.

    9 replies