Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
    Joshua Street
    Say those classes are Client1 and Client2. I want to call each in a Mono.zip so that they can be done concurrently
    I am trying to verify that they are being done concurrently by using a VirtualTime scheduler
    except I cannot figure out how to create a mock delayed response to force me to add the right schedulers to my zip method arguments
    public Mono<Tuple2<String, String>> getPair() {
        return Mono.zip(
    i am pretty certain i will have to schedule those client methods on their own schedulers
    but i am not sure
    Joshua Street
    maybe to put as a one liner: I want a test that will force me to ensure concurrency with the Mono.zip operator using VirtualTimeScheduler so I am not waiting in realtime for tests to finish
    Gleb Nikitenko
    Hi guys!
    I am very glad that I found a reactor - a community.
    I'm probably the only one in this world who uses a reactor in Android development ))
    And I want to say that for almost 2 years we did not have any problems in production.
    Thanks to everyone who takes part in the development. I feel the reactor - as very beautiful, tasty reactive solution (not only for Big EE). Gorgeous Documentation Quality, and Clean Java 8 Integration. It is a pity that in Android - space - it is still perceived wildly and non-obviously)
    Simon Baslé
    thanks @Nik-Gleb :smile:
    @bsideup , Hi am new to reactive prog.i watched your talk at @s1p@Austin.
    There, you gave an example for errors are side effects to.
    Use handle method for throwing exceptions but not throw new SomeException()..like this.
    @bsideup , Can please give best usage and example for usage Flux#handle method...I want to learn
    Sergei Egorov
    You don't throw, you "pass" exception
    With sink.error(new SomeException())
    Sorry, I didn't get that.any example kind of thing please.
    This is my reference from your talk
    what this sink.next() and sink.error() methods will do there..any basic example to better understand "sink".Please help me
    Sergei Egorov
    sink is a handler, you use it to pass the signal to the next operator
    think callback-based API:
    function doSomething(item, callback) {
        if (itemIsGood(item)) {
            callback(convertItem, null);
        } else {
            callback(null, new Error("something is wrong with the item"));
    ok.this I understood
    and what is the major difference of throw new exception() vs sink.error()?
    and how throw new exception() is a side effect?
    Sergei Egorov
    although .map will handle the error and propagate it downstream, it is an "exceptional" case, hence side-effect.
    Also, as mentioned in the talk, when exception is checked, you must wrap it with some unchecked wrapper (like RuntimeException).
    Last but not least, you're making it more explicit, what error cases you handle and how you process them, instead of just throwing things around, imperative-style.
    You can also use sink.success() (or sink.complete() if Flux) instead of passing an error and handling it later in the same operator (in the catch block) to use less operators
    Thank you @bsideup ..thanks for explanation.
    Sergei Egorov
    Happy to help!
    I have 1 requirement...like
    We are developing application with reactor but database is Oracle which blocking
    Now the problem is ...while am doing 10 validations, each requires Oracle db connection.. total 10 db connections.
    While hitting the API for performance testing with just 60 requests using gatling ...hikari connection pool size is 30..
    We are getting unable to get db connection exceptions
    Plz pullout me from this situation
    @bsideup @simonbasle
    Sergei Egorov
    @kota_sunilkumar_twitter please do not post in both reactor/reactor, reactor/reactor-core is enough
    Ok...will not repeat that...my mistake...thanks for correcting me
    @bsideup, any suggestions on my above puzzle?
    Sergei Egorov
    @kota_sunilkumar_twitter this does not really seem to be Reactor-related, since you're running into db connection exceptions
    make sure that you actually use the pool and not create a new connection every time
    I'm looking to take a list of Mono<Void> instances and execute them all in parallel. Would Flux.concat(...).parallel().subscribe() do the trick? Or would I need to call subscribe() on each of the Monos individually with different Schedulers?
    Sumit Dhaniya
    I’m creating a hot flux which publishes parallel scheduler. A subscriber which is consuming this flux is running in parallel on a new bounded elastic scheduler. While doing some testing I set the thread cap and queue cap to 1 and program started throwing error which baffled me at first but after closely going through the documentation I figured out it’s because when no more task can be enqueued
    Now I’m little confused because of I increase the queue task cap to 4-5 it works again but it’s queue should have been overflown in this case too as it’s processing 100’s of elements
    @bsideup @simonbasle , is calling doAfterTerminate on filter(exchange) work asynchronously ?
    (like filter(exchange). doAfterTerminate ()) if not , how can make it Asynchronous?
    Hi everyone
    I'm kinda new to reactor, I've read the basis and all but i'm a bit stuck on my implementation, I need some help
    I've created an endpoint, which wraps my business endpoint with data model transformation. My new endpoint call 3 methods which returns a Mono<myobject>. The first one transform "outsideModel" to "internalModel" (through a microserviceA), then I call my business microserviceB, finally I call-again my first microserviceA to transform back from internalModel to outsideModel. As my microserviceA is stateless, I need the original data from the "outsideModel" from step 1. My 3 Mono calls are chained, and I'm troubled on how can I store/save the original "outsideModel" and reuse it when I need it for my last step. Can I simply store it in a local variable ?
    i'm a bit lost... i've a list of objects (id, parentId, children) and initially only id and parentId are populated. based on id and parentId, i need to make them into a hierarchy and populate children accordingly... poking around but no solution yet... any hints?
    Alvaro Sevilla
    Hi! Hope this is the right place to ask, would really appreacieate any input.
    I have a flux instance, that I'm iterating through using toIterable. I can't find a way to cancel the underlaying generator if anything goes wrong on the consumer side, it will only close once I've iterated through enough items to exhaust it. Is there any way to trigger a cancelation event to a flux that is being consumed through toIterable()?
    Scott White
    Any thoughts on reactor vs ktor specifically the http client?
    Hongde Liu

    question about 'request' method

     final Flux<Boolean> randoms = Flux.generate(sink -> sink.next(true));
    randoms.filter(x -> x == true).subscribe(x ->System.out.println(x));

    as I know , for the above code , LambdaSubscriber will request data from FilterFuseableSubscriber , just like downstream(LambdaSubscriber) request data from upstream(FilterFuseableSubscriber) , and for FilterFuseableSubscriber it will request data from upstream, my question is :

        public void request(long n) {

    why the request number is the same as the downstream, if downstream request data number is 5 , for filter function , the requestion should more than 5 , since that it will filter some data , so can somebody tell me the logic :)

    Kirill Marchuk
    Hi. I would like to trigger a reactive action when some particular error occurs upstream and then return the original error. Wanted to use .onErrorMap but it says it's for "synchronous" functions only
    .onErrorResume then ?
    Sergei Egorov
    yes, onErrorResume allows you to handle the error asynchronously
    why Schedulers.parallel() return true for Schedulers.isInNonBlockingThread() and not Schedulers.elastic() ?
    Sergei Egorov
    @KalyanMiryala_twitter because elastic scheduler is usually for blocking tasks
    @bsideup thank you for the response , I am trying to understand why do we have requestOnSeparateThread flag when we have separate schedulers for blocking and non-blocking reactor calls. I have used requestOnSeparateThread in my application but couldn't see any difference on setting requestOnSeparateThread = true/false

    Hello Reactor team.

    Static composition can be done with combine() method, but I want a way to manage flux composition dynamically.
    I would like to use it for a IoT project where the UI can listen one or more sensor depending what the user want.
    The result flux is given to a web socket.
    Subscribe or unsubscribe to a sensor flux is done with a rest service.

    I would like to know if DirectProcessor is a good way to create a Flux multiplexer from others Flux ?
    I create a SimulatorModule to test how I can combine different flux in one flux with a DirectProcessor.
    It seams to work fine (in reality a sensor produce 1 data every 3 minutes - backpressure is not a pb now).

    I also tried with FluxSink from DirectProcessor#sink() instead DirectProcessor#onNext() but it seams to be a way less performant !
    What are the difference between these two way to feed the DirectProcessos ?

    I tried with EmitterProcessor, it seams to be less performance, is it the case ?
    Is EmitterProcessor,the only way to manage backpressure ?

    public class SimulatorModule {
        private DirectProcessor<Triple> dp;
        private HashMap<String, Disposable> producers = new HashMap<>();
        public SimulatorModule() {
            dp = DirectProcessor.create();
            long start = Instant.now().toEpochMilli();
        public void simulate(String name, long interval) {
            Disposable disposable = Flux.interval(Duration.ofMillis(interval))
                    .map(e -> new Triple(name, System.currentTimeMillis(), 1.0))
                    .subscribe(e -> dp.onNext(e));
            producers.put(name, disposable);
        public void removeSimulator(String name) {
            Disposable disposable1 = producers.get(name);
            if (disposable1 != null) {
        public Flux<Triple> flux() {
            return dp;
        public static class Triple {
            public final String name;
            public final long ts;
            public final double value;
            public Triple(String name, long ts, double value) {
                this.name = name;
                this.ts = ts;
                this.value = value;

    I call it with :

                    .filter(e -> e.name.equals("F1"))
                    .subscribe(e -> System.out.println("1 " + e.name +  " -  " +Thread.currentThread().getName())); 
                    .filter(e -> e.name.equals("F2"))
                    .subscribe(e -> System.out.println("2 " + e.name +  " -  " +Thread.currentThread().getName())); 
            simulatorModule.simulate("F1", 1000);
            simulatorModule.simulate("F2", 1200);

    Thanks in advance for your help !