Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
ruslan
@unoexperto
@leonard84 I want to have delay before repeating. How would you suggest adding that ?
Leonard Brünings
@leonard84
would a fixed .delay() work?
ruslan
@unoexperto
@leonard84 Sure
Sergei Egorov
@bsideup
@unoexperto yes, I would not recommend using it. Instead, consider pushing a "marker" item and not ignore the argument of takeUntil
@unoexperto repeatWhen(it -> it.delayElements(ofSeconds(10))
Leonard Brünings
@leonard84
yeah the poison pill pattern would be better than an external state
ruslan
@unoexperto
@leonard84 Poison pill would be nice but unfortunately the queue is diskbased persistent queue which I do not control. What if I just do this ?
Flux.<Flux<UnparsedWalEntry>>generate(s -> {
                        UnparsedWalEntry entry;
                        if (!queue.isEmpty() && (entry = queue.pop()) != null)
                            s.next(Flux.just(entry));
                        else {
                            if (sourceThread.isDisposed())
                                s.complete();
                            else
                                s.next(FluxExt.delay(ofSeconds(10)).cast(UnparsedWalEntry.class));
                        }
                    }).flatMap(Function.identity())
Sumit Dhaniya
@SumitDhaniya_twitter
@bsideup Did I ask the question in wrong forum?
Sergei Egorov
@bsideup
@unoexperto just do something like:
Mono.fromCallable(() -> {
    if (isDisposed()) return PoisonPill.INSTANCE;

    var item = queue.pop();
    return item != null ? new Message(item) : null;
});
instead of null, you could also return something like EmptyQueue.INSTANCE and then use it as an indicator that a delay should be applied
ruslan
@unoexperto
@bsideup okay!
Leonard Brünings
@leonard84

@unoexperto I would modify it a bit

Mono.fromCallable(() -> {
    var item = queue.pop();
    if (item == null) {
        return isDisposed() ? PoisonPill.INSTANCE : null
    }
    return new Message(item);
});

this would drain you queue if it still has items but your producer has already been disposed.

Sumit Dhaniya
@SumitDhaniya_twitter
I’m actually new to Reactor so was playing with it and stumbled upon a strange scenario to test runOn documentation which says rails might be shared if available threads are less than parallelism and in runOn I used boundedElasticScheduler with queue cap size 1. After debugging the code I got the issue ( parallelism > threadcap * queue cap for bounded elastic scheduler )
But I found it little strange that after repeatedly asking on channel no one bothered to point into a direction
So just want to check is this a closed or maintainer only channel. If yes I will stop posting and apologies for long message 😅
Ugnius
@UgiR
Don’t know the answer to your question, but in general, if you make a question as clear as possible, and maybe include an example, its more likely someone will be able to answer it.
Jonathan Giles
@JonathanGiles
Hello friends :-) I've been chatting offline with Stephane about hot / cold publishers, and what Reactor can do to make it more obvious to end users whether these publishers are hot or cold. Stephane suggested it might be possible to introduce isHot/isCold style introspection APIs into the next major release (3.4). This leads to two questions:
  1. What are the thoughts of others in this channel on this topic?
  2. When is 3.4 going to be released?! :D
Mikael Elm
@mickeelm
@SumitDhaniya_twitter what @UgiR said. So I'm afraid I don't know the answer either. This channel is free for anyone to join, and I believe there are both "enthusiasts" and maintainers here. You can always try StackOverflow though, if you don't get a response here.
@JonathanGiles Wouldn't know about 2, but regarding 1, I don't see why not :)
Mikael Elm
@mickeelm
Anyone knows how I would go about if I want to use a retry with backoff and a Predicate? In the API now, the only variants with "support of adding a Predicate" are retry() and retry(numRetries).
Ugnius
@UgiR
Look on reactor-extra/adding
Addons*, there’s a Retry helper
Mikael Elm
@mickeelm
Ah, great, thanks!
Mikael Elm
@mickeelm
While I'm browsing the API...is there any legend/explanation for the symbols used in the diagrams? Like, "a color filled circle represents", "an empty circle with an X represents". I know there's this image which gives a general idea of the flow, but there are many more symbols and stuff going on.
The circles/marbles are just a generic representation of some element in the Flux
Mikael Elm
@mickeelm
Thanks a lot! Yes, the basic marbles are easy to understand, but for instance the symbols described here as "computations and side effects" are stuff are not clear on first sight IMO. I think it would be great if there was a link in for instance the Flux API page that first had the overview image that it has today, and then something like "for a thorough explanation of our marble diagrams, click here", explaining more than that.
kota.sunilkumar
@kota_sunilkumar_twitter
@bsideup @simonbasle , is calling doAfterTerminate on filter(exchange) work asynchronously ?
(like filter(exchange). doAfterTerminate ()) if not , how can make it Asynchronous?
Amit Thawait
@Amit-Thawait

Hi Everyone, I am newbie to project reactor.

I am trying to implement conditional retry based on the response of an API call, but I am not able to get the response of my API call in the predicate function of retry method based on which I want to trigger retry

webClient.post(
uri = uri,
body = body,
headers = headers(),
paramClass = Response::class.java
)
.map { it.status }
.retry { <the response is not accessible here> }

Can someone please help me with this

kota.sunilkumar
@kota_sunilkumar_twitter
Hi EveryOne, does any one know, how to Get request body string from ServerHttpRequest / Flux<DataBuffer> ?
kota.sunilkumar
@kota_sunilkumar_twitter
The purpose of above line is, I need requestBody in doAfterTerminate() method in filter, by which I can do my audit related things.
violetagg
@violetagg
@kota_sunilkumar_twitter I would suggest you to ask on the appropriate channel i.e. Spring WebFlux, this one is for project Reactor
Mikael Elm
@mickeelm

@Amit-Thawait retry() will only get called on errors. From the docs:

retry(Predicate<? super Throwable> retryMatcher)

Re-subscribes to this Flux sequence if it signals any error that matches the given Predicate, otherwise push the error downstream.

Meaning, the predicate acts on the error/throwable, not on the response.
There are a couple of ways to achieve what you want. One is to signal an error/throw an Exception if the response doesn't match what you want (for instance in map(), then have retry() match on that exception (or retry on any error, i.e. without a predicate, if you wish)

Amit Thawait
@Amit-Thawait
Ok. Thanks for your response @mickeelm
Niranjan
@nnanda2016
@Amit-Thawait what do you mean by conditional response on API call? do you want to retry based on http response codes?
Niranjan
@nnanda2016
if that’s the requirement, then you have to attach an exchange filter during the WebClient bean creation and throw a custom exception and attach a retry there. Something like shown below
public WebClient webClient() {
    return WebClient.builder()
            .baseUrl(baseUrl)
            .filter((request, next) ->
                    next.exchange(request)
                        .doOnNext(clientResponse -> {
                            final HttpStatus httpStatus = clientResponse.statusCode();

                            // We will retry for 4xx errors.
                            if (httpStatus.is4xxClientError()) {
                                throw new CustomHttp4xxStatusException("4xx error while invoking API.");
                            }

                            // We will retry for 5xx errors.
                            if (httpStatus.is5xxServerError()) {
                                throw new CustomHttp5xxStatusException(“5xx error while invoking API.");
                            }
                        })
                        .retryWhen(Retry.anyOf(CustomHttp4xxStatusException.class, CustomHttp5xxStatusException.class, ReadTimeoutException.class, WriteTimeoutException.class, SSLException.class)
                                    .retryMax(maxRetryAttempt)
                                    .backoff(Backoff.exponential(Duration.ofMillis(1000), Duration.ofMillis(maxRetryWindowInMillis), 3, true))
                                    .jitter(Jitter.random(0.9))
                                    .doOnRetry(retryContext -> logger.error("[Invoking API][Retry context: {}]", retryContext))
                        )
                )
                .build()
    ;
}
Niranjan
@nnanda2016
i am not sure if there’s an easier way, but we do it currently in one of our apps and it works
vishakvinod
@vishakvinod
Hello, can someone give me steps to deploy a reactor application with binding to a PostgreSQL db service on cloudfoundry ?
Naresh Kumar Reddy Gaddam
@gaddam1987

Hi I have a use case where i need to call a paginated API repeatedly until i fetch all the pages in a sequence, as the response of the current will hold the nextPageId value which i need to use for next call.
I need some help to look at the code i wrote and provide your suggestions on it. Main Intention of the code is to share the state (nextPageId) from previous call to new .

     class PaginatedProducer implements Consumer<SynchronousSink<ApiResponse<String>>> {
        private volatile String nextPageId; 
        private volatile boolean firstReq = true;

        @Override
        public void accept(SynchronousSink<ApiResponse<String>> monoSynchronousSink) {

            if (firstReq) {
                firstReq = false;
                callToAPI(null) //call to api uses webclient and returns a MONO<ApiResponse<String>> 
                        .subscribe(s -> {
                            if (s.getHeaders().get("nextPageId") != null || !s.getHeaders().get("nextPageId").isEmpty()) {
                                nextPageId = s.getHeaders().get("nextPageId").get(0);
                                monoSynchronousSink.next(s);
                            } else {
                                monoSynchronousSink.next(s);
                                monoSynchronousSink.complete();
                            }
                        }, er -> {
                            monoSynchronousSink.error(er);
                            monoSynchronousSink.complete();
                        });
            } else if (nextPageId != null) {
                myMethod(nextPageId)
                        .subscribe(s -> {
                            if (s.getHeaders().get("nextPageId") != null && !s.getHeaders().get("nextPageId").isEmpty()) {
                                nextPageId = s.getHeaders().get("nextPageId").get(0);
                                monoSynchronousSink.next(s);
                            } else {
                                monoSynchronousSink.next(s);
                                monoSynchronousSink.complete();
                            }
                        }, er -> {
                            monoSynchronousSink.error(er);
                            monoSynchronousSink.complete();
                        });
            }
        }
    }

Then i used it with Flux.generate

Flux.generate(new PaginatedProducer())
    .subscribe(//consume it for further processing)
Sergei Egorov
@bsideup
@gaddam1987 you can also simplify it with Mono.fromCallable(...).repeat().takeUntil(...)
and, to preserve the state, you can use Mono.defer around it and store the next page id in the scope
also, it seems that you're calling another reactive API (calltoAPI), you can replace fromCallable with another defer
Leonard Brünings
@leonard84
@gaddam1987 have a look at expand, that is what I normally use for paginated access https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#expand-java.util.function.Function-
Mikael Elm
@mickeelm

Let's say a have a Mono that emits the value 1. I Also have a Flux that emits A,B,C. I can join these by using zipWith(), but I have to repeat the Mono into a Flux. In the end, I want 1A, 1B, 1C. The problem is that the value 1 is obtained from an expensive external call, I don't want to repeat that, so I cache the answer, e.g. I do cache().repeat().

This solves my problem, but it feels like a hack and while the Flux is emitting its values, the cache().repeat() on the Mono (which is now a Flux) is really fast so it may emit like 50 times which just feels unneccesary.

Anyone that has a neater solution for this problem?

Niranjan
@nnanda2016
@mickeelm can something like this work for you (using flatMapMany)?
public class MonoFlatMapManyExample {
    public static void main(final String[] args) {
        combine(getMono(), getFlux());
    }

    private static Mono<Integer> getMono() {
        return Mono.just(1);
    }

    private static Flux<String> getFlux() {
        return Flux.just("A", "B", "C");
    }

    private static void combine(final Mono<Integer> mono, final Flux<String> flux) {
        mono
            .flatMapMany(item -> flux.map(str -> new StringBuilder().append(item).append(str).toString()))
            .doOnNext(nextItem -> System.out.printf("Next emitted item -> %s%n", nextItem))
            .doFinally(signal -> System.out.printf("Completed with signal %s%n", signal.name()))
            .blockLast()
            ;
    }
}
The output of this is
Next emitted item -> 1A
Next emitted item -> 1B
Next emitted item -> 1C
Completed with signal ON_COMPLETE
Mikael Elm
@mickeelm
@nnanda2016 thanks! I like this way better!
Aah, come think of it, the only thing is that the processing of the flux (which in real life is a database call) won't happen in parallell, but first when the Mono is emitted. So that's a drawback I guess but it still might be a better option in this case.
Niranjan
@nnanda2016
yea you’re right; this solution works only if you want to use the result of mono in the flux.