Now : Reactor Core 3.3.0.RELEASE - Reactor Netty 0.9.0.RELEASE - BOM : DYSPROSIUM-RELEASE
repeatWhen(it -> it.delayElements(ofSeconds(10))
Flux.<Flux<UnparsedWalEntry>>generate(s -> {
UnparsedWalEntry entry;
if (!queue.isEmpty() && (entry = queue.pop()) != null)
s.next(Flux.just(entry));
else {
if (sourceThread.isDisposed())
s.complete();
else
s.next(FluxExt.delay(ofSeconds(10)).cast(UnparsedWalEntry.class));
}
}).flatMap(Function.identity())
null
, you could also return something like EmptyQueue.INSTANCE
and then use it as an indicator that a delay should be applied
@unoexperto I would modify it a bit
Mono.fromCallable(() -> {
var item = queue.pop();
if (item == null) {
return isDisposed() ? PoisonPill.INSTANCE : null
}
return new Message(item);
});
this would drain you queue if it still has items but your producer has already been disposed.
Hi Everyone, I am newbie to project reactor.
I am trying to implement conditional retry based on the response of an API call, but I am not able to get the response of my API call in the predicate function of retry method based on which I want to trigger retry
webClient.post(
uri = uri,
body = body,
headers = headers(),
paramClass = Response::class.java
)
.map { it.status }
.retry { <the response is not accessible here> }
Can someone please help me with this
@Amit-Thawait retry()
will only get called on errors. From the docs:
retry(Predicate<? super Throwable> retryMatcher)
Re-subscribes to this Flux sequence if it signals any error that matches the given Predicate, otherwise push the error downstream.
Meaning, the predicate acts on the error/throwable, not on the response.
There are a couple of ways to achieve what you want. One is to signal an error/throw an Exception if the response doesn't match what you want (for instance in map()
, then have retry()
match on that exception (or retry on any error, i.e. without a predicate, if you wish)
WebClient
bean creation and throw a custom exception and attach a retry
there. Something like shown belowpublic WebClient webClient() {
return WebClient.builder()
.baseUrl(baseUrl)
.filter((request, next) ->
next.exchange(request)
.doOnNext(clientResponse -> {
final HttpStatus httpStatus = clientResponse.statusCode();
// We will retry for 4xx errors.
if (httpStatus.is4xxClientError()) {
throw new CustomHttp4xxStatusException("4xx error while invoking API.");
}
// We will retry for 5xx errors.
if (httpStatus.is5xxServerError()) {
throw new CustomHttp5xxStatusException(“5xx error while invoking API.");
}
})
.retryWhen(Retry.anyOf(CustomHttp4xxStatusException.class, CustomHttp5xxStatusException.class, ReadTimeoutException.class, WriteTimeoutException.class, SSLException.class)
.retryMax(maxRetryAttempt)
.backoff(Backoff.exponential(Duration.ofMillis(1000), Duration.ofMillis(maxRetryWindowInMillis), 3, true))
.jitter(Jitter.random(0.9))
.doOnRetry(retryContext -> logger.error("[Invoking API][Retry context: {}]", retryContext))
)
)
.build()
;
}
Hi I have a use case where i need to call a paginated API repeatedly until i fetch all the pages in a sequence, as the response of the current will hold the nextPageId value which i need to use for next call.
I need some help to look at the code i wrote and provide your suggestions on it. Main Intention of the code is to share the state (nextPageId) from previous call to new .
class PaginatedProducer implements Consumer<SynchronousSink<ApiResponse<String>>> {
private volatile String nextPageId;
private volatile boolean firstReq = true;
@Override
public void accept(SynchronousSink<ApiResponse<String>> monoSynchronousSink) {
if (firstReq) {
firstReq = false;
callToAPI(null) //call to api uses webclient and returns a MONO<ApiResponse<String>>
.subscribe(s -> {
if (s.getHeaders().get("nextPageId") != null || !s.getHeaders().get("nextPageId").isEmpty()) {
nextPageId = s.getHeaders().get("nextPageId").get(0);
monoSynchronousSink.next(s);
} else {
monoSynchronousSink.next(s);
monoSynchronousSink.complete();
}
}, er -> {
monoSynchronousSink.error(er);
monoSynchronousSink.complete();
});
} else if (nextPageId != null) {
myMethod(nextPageId)
.subscribe(s -> {
if (s.getHeaders().get("nextPageId") != null && !s.getHeaders().get("nextPageId").isEmpty()) {
nextPageId = s.getHeaders().get("nextPageId").get(0);
monoSynchronousSink.next(s);
} else {
monoSynchronousSink.next(s);
monoSynchronousSink.complete();
}
}, er -> {
monoSynchronousSink.error(er);
monoSynchronousSink.complete();
});
}
}
}
Then i used it with Flux.generate
Flux.generate(new PaginatedProducer())
.subscribe(//consume it for further processing)
Mono.defer
around it and store the next page id in the scope
calltoAPI
), you can replace fromCallable
with another defer
expand
, that is what I normally use for paginated access https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#expand-java.util.function.Function-
Let's say a have a Mono that emits the value 1. I Also have a Flux that emits A,B,C. I can join these by using zipWith(), but I have to repeat the Mono into a Flux. In the end, I want 1A, 1B, 1C. The problem is that the value 1 is obtained from an expensive external call, I don't want to repeat that, so I cache the answer, e.g. I do cache().repeat().
This solves my problem, but it feels like a hack and while the Flux is emitting its values, the cache().repeat() on the Mono (which is now a Flux) is really fast so it may emit like 50 times which just feels unneccesary.
Anyone that has a neater solution for this problem?
public class MonoFlatMapManyExample {
public static void main(final String[] args) {
combine(getMono(), getFlux());
}
private static Mono<Integer> getMono() {
return Mono.just(1);
}
private static Flux<String> getFlux() {
return Flux.just("A", "B", "C");
}
private static void combine(final Mono<Integer> mono, final Flux<String> flux) {
mono
.flatMapMany(item -> flux.map(str -> new StringBuilder().append(item).append(str).toString()))
.doOnNext(nextItem -> System.out.printf("Next emitted item -> %s%n", nextItem))
.doFinally(signal -> System.out.printf("Completed with signal %s%n", signal.name()))
.blockLast()
;
}
}
Next emitted item -> 1A
Next emitted item -> 1B
Next emitted item -> 1C
Completed with signal ON_COMPLETE