Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    amangarg4804
    @amangarg4804
    Hi, I am trying to call a validate method that has return type Mono<Void>, but I am not able to. Validate method can either return Mono.empty() or Mono.error(). Here is an example code (I want to call validate method inside processSomeData method):
    @Test
        public void testMonoVoid() {
            System.out.println(processSomeData(Mono.just("abcd")).block());
        }
    
    
        public Mono<String> processSomeData(Mono<String> input) {
            return input.flatMap( input1 -> {
                if(input1.equals("abcd")) {
                    return Mono.just("something");
                }
                return Mono.just("somethingElse");
            });
        }
    
        public Mono<Void> validate(String s) {
            if(s.length()>3) {
                return Mono.error(new RuntimeException("Length is greater than 3"));
            }
            return Mono.empty();
        }
    Sergei Egorov
    @bsideup
    @amangarg4804 you can use Mono#handle and change validate to BiFunction<String, MonoSink>, where you will either call sink.success(s) or sink.error and use it as: input.handle(this::validate).flatMap(input1 -> ...)
    amangarg4804
    @amangarg4804
    @bsideup Compiler says BiFunction should have 3 arguments, could you please show the validate method ? In reality validate is a generic method that can take any object of type T. It created this example just to simplify the problem
    Sergei Egorov
    @bsideup
    just extract .handle's argument
    amangarg4804
    @amangarg4804
    @bsideup I think you meant BiConsumer<String, MonoSink> instead of BiFunction<String, MonoSink> in your first message?
    Sergei Egorov
    @bsideup
    Yes, my bad! BiConsumer of course!
    amangarg4804
    @amangarg4804

    Okay. when I use input.handle(this::validate). It wants me to create validate method with following signature.

    private <R> void validate(String s, SynchronousSink<R> rSynchronousSink) {
        }

    I don't see BiConsumer. Also, the return type of this method is void instead of Mono<Void> , there could be some validations that require accessing the database using reactive repositories and I would have to call block() on them in this validate method which I don't want to do. Actually I am new to reactor :| and unable to solve this problem. Could you please provide some more input?

    amangarg4804
    @amangarg4804
    I know it would be easier if this method, instead of returningMono<Void>, returns Mono<Boolean> or some Mono<CustomObject>. Then I can use flatmap on it . But I am just wondering if it is possible if this method returns Mono<Void>? I tried using `flatmap for Mono<Void> but it didn't work
    Sergei Egorov
    @bsideup

    @amangarg4804 .handle(this::validate) automatically converts your method into BiConsumer. I would suggest reading about Java's lambdas and method references.

    If you need to do async validation, you can use: input.delayUntil(this::validate) where private Mono<Void> validate(String s)

    Joshua Street
    @jjstreet
    hey all: question about Mono.zip()
    I have 2 classes that each use a spring WebClient to emit Monos of some data
    Say those classes are Client1 and Client2. I want to call each in a Mono.zip so that they can be done concurrently
    I am trying to verify that they are being done concurrently by using a VirtualTime scheduler
    except I cannot figure out how to create a mock delayed response to force me to add the right schedulers to my zip method arguments
    public Mono<Tuple2<String, String>> getPair() {
        return Mono.zip(
                client1.getFirstPart(),
                client2.getFirstPart());
    }
    i am pretty certain i will have to schedule those client methods on their own schedulers
    but i am not sure
    Joshua Street
    @jjstreet
    maybe to put as a one liner: I want a test that will force me to ensure concurrency with the Mono.zip operator using VirtualTimeScheduler so I am not waiting in realtime for tests to finish
    Gleb Nikitenko
    @Nik-Gleb
    Hi guys!
    I am very glad that I found a reactor - a community.
    I'm probably the only one in this world who uses a reactor in Android development ))
    And I want to say that for almost 2 years we did not have any problems in production.
    Thanks to everyone who takes part in the development. I feel the reactor - as very beautiful, tasty reactive solution (not only for Big EE). Gorgeous Documentation Quality, and Clean Java 8 Integration. It is a pity that in Android - space - it is still perceived wildly and non-obviously)
    Simon Baslé
    @simonbasle
    thanks @Nik-Gleb :smile:
    kota.sunilkumar
    @kota_sunilkumar_twitter
    @kota_sunilkumar_twitter
    @bsideup , Hi am new to reactive prog.i watched your talk at @s1p@Austin.
    There, you gave an example for errors are side effects to.
    Use handle method for throwing exceptions but not throw new SomeException()..like this.
    @bsideup , Can please give best usage and example for usage Flux#handle method...I want to learn
    Sergei Egorov
    @bsideup
    You don't throw, you "pass" exception
    With sink.error(new SomeException())
    kota.sunilkumar
    @kota_sunilkumar_twitter
    Sorry, I didn't get that.any example kind of thing please.
    WyZaFQCC.jpg-medium.jpeg
    This is my reference from your talk
    what this sink.next() and sink.error() methods will do there..any basic example to better understand "sink".Please help me
    Sergei Egorov
    @bsideup
    sink is a handler, you use it to pass the signal to the next operator
    think callback-based API:
    function doSomething(item, callback) {
        if (itemIsGood(item)) {
            callback(convertItem, null);
        } else {
            callback(null, new Error("something is wrong with the item"));
        }
    }
    kota.sunilkumar
    @kota_sunilkumar_twitter
    ok.this I understood
    and what is the major difference of throw new exception() vs sink.error()?
    and how throw new exception() is a side effect?
    Sergei Egorov
    @bsideup
    although .map will handle the error and propagate it downstream, it is an "exceptional" case, hence side-effect.
    Also, as mentioned in the talk, when exception is checked, you must wrap it with some unchecked wrapper (like RuntimeException).
    Last but not least, you're making it more explicit, what error cases you handle and how you process them, instead of just throwing things around, imperative-style.
    You can also use sink.success() (or sink.complete() if Flux) instead of passing an error and handling it later in the same operator (in the catch block) to use less operators
    kota.sunilkumar
    @kota_sunilkumar_twitter
    Thank you @bsideup ..thanks for explanation.
    Sergei Egorov
    @bsideup
    Happy to help!
    kota.sunilkumar
    @kota_sunilkumar_twitter
    I have 1 requirement...like
    We are developing application with reactor but database is Oracle which blocking
    Now the problem is ...while am doing 10 validations, each requires Oracle db connection.. total 10 db connections.
    While hitting the API for performance testing with just 60 requests using gatling ...hikari connection pool size is 30..
    We are getting unable to get db connection exceptions
    Plz pullout me from this situation
    @bsideup @simonbasle
    Sergei Egorov
    @bsideup
    @kota_sunilkumar_twitter please do not post in both reactor/reactor, reactor/reactor-core is enough
    kota.sunilkumar
    @kota_sunilkumar_twitter
    Ok...will not repeat that...my mistake...thanks for correcting me
    @bsideup, any suggestions on my above puzzle?
    Sergei Egorov
    @bsideup
    @kota_sunilkumar_twitter this does not really seem to be Reactor-related, since you're running into db connection exceptions
    make sure that you actually use the pool and not create a new connection every time
    blake-bauman
    @blake-bauman
    I'm looking to take a list of Mono<Void> instances and execute them all in parallel. Would Flux.concat(...).parallel().subscribe() do the trick? Or would I need to call subscribe() on each of the Monos individually with different Schedulers?
    Sumit Dhaniya
    @SumitDhaniya_twitter
    I’m creating a hot flux which publishes parallel scheduler. A subscriber which is consuming this flux is running in parallel on a new bounded elastic scheduler. While doing some testing I set the thread cap and queue cap to 1 and program started throwing error which baffled me at first but after closely going through the documentation I figured out it’s because when no more task can be enqueued
    Now I’m little confused because of I increase the queue task cap to 4-5 it works again but it’s queue should have been overflown in this case too as it’s processing 100’s of elements
    kota.sunilkumar
    @kota_sunilkumar_twitter
    @bsideup @simonbasle , is calling doAfterTerminate on filter(exchange) work asynchronously ?
    (like filter(exchange). doAfterTerminate ()) if not , how can make it Asynchronous?
    M3yo
    @M3yo
    Hi everyone
    I'm kinda new to reactor, I've read the basis and all but i'm a bit stuck on my implementation, I need some help