Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Hugo Barrigas
@hfgbarrigas
hi :wave: any pointers on this one? reactor/reactor-core#1832
Davin Kevin
@davinkevin

Hi everyone,

I'm looking for something pretty special but, I didn't have any success search for this on Google...

I would like to do something which seems to be simple: :

If a Mono is empty, I need to return something coming from another function.

aMono.switchIfEmpty { anotherFunction() }

If this same mono is NOT empty, I need to return an empty mono.

aMono.flatMap(Mono.empty())

But I can't use both at the same time from my point of view because they will be called one after the other, and in my case I only want one to be called.

Do you have any idea about this?

SRohitKumar
@SRohitKumar

hi can anyone please give me some good example/reference of parallelly calling a web service and then executing a task after the completion of all the web services.

public Flux<Map<String, String>> deAuthorizeUsersFromVehicles(List<AuthorizationAttributes> attributes) {
Map<String, String> ResponseObject = new HashMap<>();
auResourcesClient.getVehicle(attributes.get(0).getVin())
.doOnSuccess(vehicle -> {
attributes.stream().forEach(attribute -> {
auIamClient.deAuthorizeOrDeleteVehicle(attribute.getUserId(), vehicle.getId())
.doOnError(t -> {
ResponseObject.put(attribute.getUserId(),t.getMessage());
}).doOnSuccess(t -> {
ResponseObject.put(attribute.getUserId(),"SuccessFull");
}).subscribe();
});
});
Flux<Map<String,String>> flux1 = Flux.just(ResponseObject);
System.out.println("SuccessFull Completion");
return flux1;

}

Here I am trying to call deAuthorizeOrDeleteVehicle endpoint and making my own response object. But I am not able to make the sequential execution of this code. I am pretty new to reactive programming. Any help will be appretiated. :)

radhakrishnanch
@radhakrishnanch
Spring Webflux with tomcat getting continuous warn logs “NioEndpoint - Incorrect connection count, multiple calls to socket.close for the same socket.” Please help.
violetagg
@violetagg
You should ask this on stackoverflow with tag spring-webflux
radhakrishnanch
@radhakrishnanch
Thank You @violetagg
Pieter Pletinckx
@pspletinckx
I'm trying to develop a CAS Auth module that works well with spring-webflux. Having a real hard time finding the "Right way to do security".
For example, if you make a webfilter.. how do I make use of the "Remember me" that is present on the .formlogin() and .basiclogin() ?
Zoraida Hidalgo
@zoraida
Hi all, I am new on reactor and I am trying to implement an Elasticsearch async client that is able to consume query results as a stream. For this purpose, I want to use the scroll API(https://www.elastic.co/guide/en/elasticsearch/reference/7.2/search-request-scroll.html). To summarize, for a given query, I will do N requests. Since a given request needs a "token" that is supplied on previous response, requests will be executed sequentially.
I have written a piece of code that abstract this idea:
public static <I, O> Flux<O> pagination(Pair<I, Optional<O>> input, Function<Pair<I, Optional<O>>, Mono<O>> monoF, Function<O, Boolean> continueF) {
    return monoF.apply(input).flatMapMany(o -> {
        if (continueF.apply(o)) return Mono.just(o).concatWith(pagination(Pair.with(input.getValue0(), Optional.of(o)), monoF, continueF));
        else return Mono.empty();
    });
}
monoF is the function that does a single request to the scroll API. It needs Pair<I, Optional<O>> where I is any additional params and Optional<O> is the previous response (that may exist or not).
continueF is a function that decides if more requests need to be done
My question is, is this executed recursively on the stack? If yes, is there any way to avoid it?
Trần Tiến Đức
@trantienduchn
how can I get then remove an item manually from a Flux, like a queue does?
Mono<Object> item = flux.poll(); // that is the method I imagine, get the next item then remove it
vanseverk
@vanseverk
@trantienduchn I'd suggest using a filter
it does not make much idiomatic sense to really call it "removing" though
Davin Kevin
@davinkevin

@trantienduchn depends of what you want to do next with the flux after...

If you only want to take the first value of the flux, I think the Mono.from(flux) will do the job (in Kotlin, we can use toMono() to do the same)

Davin Kevin
@davinkevin
Following my question made before, I've open a question on stack overflow => https://stackoverflow.com/questions/57497981/return-mono-empty-if-a-value-is-found-but-without-executing-other-steps
radhakrishnanch
@radhakrishnanch
Spring Webflux - Tomcat NioEndpoint - Incorrect connection count issue. https://stackoverflow.com/questions/57483389/nioendpoint-incorrect-connection-count-multiple-calls-to-socket-close-for-the . Please help me.
Trần Tiến Đức
@trantienduchn

What I want is a laziness datasource (so I choose Flux), and the datasource will only provide data items on demand, not traveling through it. Here is my draft implementation:

class MyInputStream extends InputStream {

    private final Flux<Byte> bytes;

    MyInputStream(Flux<byte[]> fluxBytes) {
        this.bytes = fluxBytes.flatMapSequential(byteArr -> Flux.fromArray(ArrayUtils.toObject(byteArr)));
    }

    @Override
    public int read() throws IOException {
         return bytes.next().blockOptional().map(Byte::intValue).orElse(-1); // but .next() always return the first elements every time.
    }
}

Is there any way possible with Flux?

Trần Tiến Đức
@trantienduchn
Basically I want to convert a Flux<Byte> to an InputStream
shlomishem
@shlomishem
Hi
I want to convert ListenableFuture<V> into Mono<V>, (I can't use spring 5). Is this a good method?
public static <V> Mono<V> listenableFutureSupplierToMono(Supplier<ListenableFuture<? extends V>> listenableFutureSupplier) {
        return Mono.create(sink -> {
            try {
                ListenableFuture<? extends V> future = listenableFutureSupplier.get();
                future.addListener(() -> {
                    if (future.isDone()) {
                        try {
                            sink.success(future.get());
                        } catch (ExecutionException cause) {
                            sink.error(cause.getCause());
                        } catch (Exception cause) {
                            sink.error(cause);
                        }
                    }
                }, Runnable::run);
                sink.onCancel(() -> future.cancel(true));
            } catch (Exception cause) {
                sink.error(cause);
            }
        });
    }
Mark Paluch
@mp911de
This is Guava's ListenableFuture and not Spring's
shlomishem
@shlomishem
Yes forgot to mention
Is this a good way to convert it into reactor mono?
Mark Paluch
@mp911de
This approach looks okay. I think there are various ways how to build an adapter and this one looks good.
shlomishem
@shlomishem
Thanks Mark
Mark Paluch
@mp911de
One thing: Please make sure that future.get() does not return null, otherwise sink.success(…) fails.
If you encounter null, call success() without an argument.
shlomishem
@shlomishem

I checked this simple code:

Mono.create(sink -> sink.success(null))
                .doOnNext(o -> System.out.println("next " + o))
                .doOnSuccess(o -> System.out.println("success " + o))
                .doOnError(Throwable::printStackTrace)
                .subscribe();

The output is success null which means that we get an empty mono and not error.

Trần Tiến Đức
@trantienduchn
I think just like this is enough, incase you dont care about canceling handling:
ListenableFuture future;
Mono.fromCallable(future::get);
shlomishem
@shlomishem
@trantienduchn futures are eager but Mono should be lazy
Trần Tiến Đức
@trantienduchn
@shlomishem so you can pass the supplier<Future> into Mono.fromSupplier(), that is the argument of your method above.
shlomishem
@shlomishem
but future.get is blocking
niksw7
@niksw7
Hello @. I was reading about flatMap in documentation which states following
this operator does not necessarily preserve original ordering.
I tried many different examples but could not figure out a way to make flatmap change the ordering of the elements. Anyone has any examples where ordering is not maintained please?
Martin Tarjányi
@martin-tarjanyi
@niksw7 did you experiment with delay? try adding different delay for each element, first one with the longest, etc.
niksw7
@niksw7
@martin-tarjanyi yes i did add a Thread.sleep(random)

fun main(args: Array<String>) {


    Flux.range(1, 10)
            .flatMap {
                doSomeIoOperation(it)
            }.subscribeOn(Schedulers.elastic()).subscribe {
        println(it)
    }
    Thread.sleep(50000)
}

val random = Random(100)
fun doSomeIoOperation(number: Int): Mono<Int> {
    return number.toMono().flatMap {
        val l = random.nextInt(1, 10) * 500L
        println("sleeping for $l")
        Thread.sleep(l)
        it.toMono()
    }
}
Dan Cohen-Smith
@dancohensmith
hey who can i talk to about reactor-kafka?
i want to understand how i am meant to use a sender if i have a single message coming in over http and i want to send that out
i makes no sense as it only seems to take a flux
also the transactional support isn't clear
Martin Tarjányi
@martin-tarjanyi
@niksw7 it is sequential because subscribeOn operator is used after the flatMap, which essentially makes your Flux single-threaded. However, if you put subscribeOn inside the flatMap you will get the expected random ordering (and faster execution).
    Flux.range(1, 10)
            .flatMap { doSomeIoOperation(it).subscribeOn(Schedulers.elastic()) }
            .subscribe { println(it) }
Josh Fix
@joshfix
i’m working on a legacy java library, so it’s not spring and it’s not reactive. this library has a method that will make multiple http range requests to the same endpoint. i’d like to make these in parallel. is reactor a good fit for this, just to simply parallelize the code to make requests and block for the response? or am i better off just using an async http library?
Andreas Schilling
@styx_hcr_twitter
Without knowing all the gritty details I'd say the latter.
nikhilaroratgo
@nikhilaroratgo
@joshfix I used completablefuture for this to aggregate the result of several http calls
René S
@reneschroeder0000
@bsideup it is. but i always prefer more concise expressions. for collections there exist filter and filterNot...