All generic questions around Reactor. For advanced questions you can also try #reactor-core and #reactor-netty
public static Flux<ByteBuffer> chunker(Flux<ByteBuffer> data, int chunkSize) {
return Flux.defer(() -> {
AtomicLong countDown = new AtomicLong(chunkSize);
return data.bufferUntil(buf -> {
if (countDown.addAndGet(-buf.remaining()) <= 0L) {
countDown.set(chunkSize);
return true;
}
return false;
}).concatMap(bufList -> {
int size = bufList.stream().mapToInt(ByteBuffer::remaining).sum();
return Mono.just(bufList.stream().reduce(ByteBuffer.allocate(size), ByteBuffer::put));
});
});
}
is there a straightforward way to achieve a graceful flush/shutdown with an infinite source? I have a Flux from a KafkaReceiver, and at shutdown, I want to stop receiving messages from upstream, but allow all of the in-flight work to complete, and then allow the KafkaReceiver to commit its offsets (via the provided ReceiverOffset.acknowledge
callback), before the KafkaReceiver closes its underlying kafka consumer. dispose
seems to propagate upstream immediately, canceling all buffered work... and it also induces the KafkaConsumer to be closed before the offsets associated with that work can be committed.
I think this might naturally take the form of arranging for the request
signals flowing upstream to the KafkaReceiver to be disabled at shutdown time, but I haven't found a simple way to inject that behavior into the wiring. I'm using a pretty hacky solution that involves publish
to a ConnectableFlux, with a bogus consumer that stops requesting ... but this seems pretty heavyweight and unnatural. Any suggestions?
Hi everyone, I have been using Mono.zip a lot in my application and since I am from nodejs background, I was thinking it works like promise.all but I found recently In mono.zip if one of the mono returns empty it won't run
replication
fun main() {
val m1 = Mono.just("A")
val m2 = Mono.just("B")
val m3 = Mono.empty<String>()
Mono.zip(m1,m2,m3).map {
println("Data t1: ${it.t1} t2: ${it.t2} t3: ${it.t3}")
}.subscribe()
}
It dont prints anything
workaround
fun main() {
val m1 = Mono.just("A").map { Optional.of(it) }.defaultIfEmpty(Optional.empty())
val m2 = Mono.just("B").map { Optional.of(it) }.defaultIfEmpty(Optional.empty())
val m3 = Mono.empty<String>().map { Optional.of(it) }.defaultIfEmpty(Optional.empty())
Mono.zip(m1,m2,m3).map {
println("Data t1: ${it.t1.get()} t2: ${it.t2.get()} t3: ${it.t3.orElse(null)}")
}.subscribe()
}
My requirement is I have a couple of mono and I need output of all of them even one is empty. In case any of the mono fails then it can throw an error.
@OlegDokuka. any view regarding this buddy
The term “stream” [...] is a generic term that applies to Java Streams, Observables and even Iterators.
.module
file. How come?
ExchangeFilterFunction
in WebClient
. I declared a filter that runs backoff retry on error, but for WebClientResponseException
type it doesn't work, retry mechanism not run. Unlike WebClientRequestException
for which is working. What I do wrong? //creating client
var client = WebClient.builder()
.defaultHeader(CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.baseUrl(serviceUrl)
.filter(decorateFunction)
.build();
(...)
//example of using
webClient.delete().uri(deleteUrl).retrieve()
.bodyToMono(XXX.class)
.doOnSuccess(consumeSuccess())
.doOnError(consumeError())
.subscribe();
(...)
//`decorateFunction` is a like below
public interface ReactiveRequestDecorator extends ExchangeFilterFunction {
/**
* Decorate the given request.
*
* @param request the request to decorate
* @param <T> the type of result returned
* @return decorated request
*/
<T> Mono<T> decorate(Mono<T> request);
@Override
default Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
return decorate(next.exchange(request));
}
}
// example implementation
(...)
@Override
public <T> Mono<T> decorate(Mono<T> request) {
return Mono.defer(() -> request)
.retryWhen(Retry.backoff(3, Duration.ofMillis(200));
}
(...)
Hi all. Is there any difference regarding memory usage when using share() in the following code?
final Path file = [...];
final Flux<DataBuffer> buffer = this.webClient.get().uri(url).retrieve().bodyToFlux(DataBuffer.class).retry(3);
// Option 1 without calling share()
DataBufferUtils.write(buffer, file, StandardOpenOption.CREATE).block();
// Option 2 with calling share()
DataBufferUtils.write(buffer, file, StandardOpenOption.CREATE).share().block();
My application has to download a lot of files and I want to use as less RAM as possible.
hi all, Anyone has used Flow, Coroutine, and Webflux together in production. I want to know how is the experience. I have a spring weblfux application, I want to convert it to flow and webflux. The benefit would be imperative looking at asynchronous code
https://spring.io/blog/2019/04/12/going-reactive-with-spring-coroutines-and-kotlin-flow
Hello all, i need to investigate the subscription cancellation cause on a webflux application. How can i log/determine the cause of a cancel event. Following code is called from the RestController:
Mono.fromCallable(() -> {
// do some blocking stuff
log.info("creating some value");
return someValue;
})
.doOnNext(p -> log.info("someValue created: {}", p))
.doOnError(throwable -> log.error("Something went wrong on someValue creation", throwable))
.doOnCancel(() -> log.error("cancelled on something ..."))
and produces following log:
"message":"| onSubscribe([Fuseable] FluxOnAssembly.OnAssemblyConditionalSubscriber)","logger_name":"reactor.Mono.OnAssembly.224","thread_name":"nioEventLoopGroup-3-2"
"message":"| request(unbounded)","logger_name":"reactor.Mono.OnAssembly.224","thread_name":"nioEventLoopGroup-3-2"
"message":"onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)","logger_name":"reactor.Mono.CallableOnAssembly.223","thread_name":"nioEventLoopGroup-3-2"
"message":"request(unbounded)","logger_name":"reactor.Mono.CallableOnAssembly.223","thread_name":"nioEventLoopGroup-3-2"
"message":"| cancel()","logger_name":"reactor.Mono.OnAssembly.224","thread_name":"reactor-http-epoll-4",
"message":"cancel()","logger_name":"reactor.Mono.CallableOnAssembly.223","thread_name":"reactor-http-epoll-4",
"message":"cancelled on something ...","logger_name":"domain.Service","thread_name":"reactor-http-epoll-4"
"message":"creating some value","logger_name":"domain.Service2", "thread_name":"nioEventLoopGroup-3-2"
any help would be appreciated! Regards, Gena
Hello. I was reading the code for EmitterProcessor and wanted to ask a question.
Why do we use loop for synchronization in add / remove methods?
If 'SUBSCRIBERS' is saved in Map structure, it seems that 'System.arraycopy' is not called every time in the add method. And in the remove method, it seems that it is not necessary to find the element while traversing the array.
Maybe there is a reason I didn't think of. If you let me know, I'd appreciate it.
Hello,
I want to build a system where multiple subscribers can subscribe to an individual events, something like different queues in message broker, and a publisher that just sends all messages to a single pile.
What I am trying to understand is how to organize this routing between subscribers so each consumes only messages addressed to this concrete consumer.
If that may help, I am using reactive spring.
Hello,
I am using a unicast sink like this. Incase of NON_SERIALIZED emits, emitNext() method is blocking till the consumeItem gets Completed. Why is this happening?
private final Sinks.Many<String> inputSink = Sinks.many().unicast().onBackpressureBuffer(queue);
//On publisher TheadPool A
public Mono<Void> produceItem(String item) {
log.info("Emit Next");
//emitNext is blocking on NON_SERIALIZED emits. why??
inputSink.emitNext(item, getEmitFailureHandler());
log.info("Emit Done");
return Mono.empty();
}
//subscriber Thread Pool B
public Flux<String> consumeItem() {
return inputSink.asFlux()
.publishOn(Schedulers.boundedElastic())
.flatMap(...);
}
I am trying to stress test a client-server system by submitting 1000 messages from the client, each as a Mono, but doing 10 simultaneously. My code is:
Flux.range(0, 1000) //send a total of 1000 messages
.flatMap(i -> sendMessage())
.limitRate(10) //10 at a time
.collectList()
.block();
But it isn't working. Instead, the client initially submits 256 simultaneously.
The FluxFlatMap.FlatMapMain.onSubscribe(Subscription s) method uses:
s.request(Operators.unboundedOrPrefetch(maxConcurrency));
And maxConcurrency is 256.
It doesn't matter what values I use in limitRate, e.g., limitRate(2,1)
This is caused by the flatMap method, which I call with default concurrency and prefetch:
public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper) {
return flatMap(mapper, Queues.SMALL_BUFFER_SIZE, Queues
.XS_BUFFER_SIZE);
}
I tried moving the limitRate() method in front of flatMap but get the same behavior.
So limitRate doesn't do anything, and the code needs to be:
Flux.range(0, 1000)
.flatMap(i -> sendMessage(), 10, 10)
//.limitRate(10) has no effect
.collectList()
.block();
Anyone have an explanation?
Folks, I just noticed that following code in my main data stream causes significant performance degradation (~2X)
.timeout(inactivityTimeout, Flux.defer {
logger.trace("Stopping job after $inactivityTimeout of inactivity.")
Flux.empty()
})
Async-profiler shows that significant amount of time is spent on java/util/concurrent/locks/LockSupport.unpark
method. What am I missing here?
reactor.netty.http.client.PrematureCloseException: Connection prematurely closed BEFORE response
wiretap: true
does not show anything suspicious.httpClient.secure
for configuring mutual authentication.I recently switched my app over from webmvc to webflux, and tried to configure this
@Configuration
@EnableWebFlux
public class WebConfig implements WebFluxConfigurer {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**")
.allowedOrigins("*");
}
@Bean
CorsWebFilter corsWebFilter() {
var corsConfig = new CorsConfiguration();
corsConfig.setAllowedOrigins(List.of("*"));
var source = new UrlBasedCorsConfigurationSource();
source.registerCorsConfiguration("/**", corsConfig);
return new CorsWebFilter(source);
}
}
and also tried adding @CrossOrigin
to my controller.
But I'm still not getting the cross-origin response headers.
Anyone know the fix?
Hi :wave:. I'm an engineer with Couchbase, and I'm struggling with adding full thread-safety to our transactions library that is based around reactor. For some context, here is a sample transaction:
transactions.reactive().run((ctx) -> {
// This chain is specified by the user, saying what they want to happen in the transaction.
return Flux.fromIterable(ids)
.parallel()
.runOn(Schedulers.elastic())
// User is doing concurrent INSERTs transactionally
.flatMap(id -> ctx.query("INSERT..."))
.then();
}).subscribe();
The ctx.query() internally needs to take a mutex lock for part of the work. (I’ve written a reactive-compatible mutex that keeps a list of lock-waiters - Sinks).
The problem is if one of those concurrent query operations fails. I understand that a stream can only raise one error. How this seems to be implemented is that if an error is raised past the concatMap, then all the remaining concurrent query operations simply stop at the end of their current operator - no error or complete signal is raised on them. This understanding is based on trial-and-error - am I correct on this?
This is causing me some real issues as I need those concurrent query ops to do some tidyup before dying - namely they have to remove themselves from the mutex (e.g. unlocking it if they have the lock, removing themselves from the list of waiters if they don’t). Otherwise subsequent rollback, which also needs the lock, deadlocks.
On non-query operations I use a workaround of waiting for the other ops to complete, before propagating the error (and also reject any new operations). But this doesn’t work for query, as some query errors are potentially not fatal to the transaction, and need to be raised out of the ctx.query() to give the user the chance to ignore it.
I hope that makes sense, let me know if there’s anything I need to clarify.
So - I’m not sure exactly what my request is. If I had a way to do some work just before an operation dies, that would work for me. Or perhaps there is an alternative way of doing things that I’m missing here - I feel this must be a relatively common situation, e.g. concurrent ops needing to do cleanup if one of them fails. Please let me know if you have any ideas.
limitRate
. We see far more requests than expected. Right now our service is dying with out of memory, because the repository is too slow. What do we miss here? Any Advice?clientA.retrieveFoos()
.flatMap { foo -> clientB.retrieveBars(foo) }
.flatMap { someSlowRepository.save(it) }
.limitRate(10)
Issue with spring webclient subscribe(...)
I have setup spring cloud stream application. Here is my consumer spring function
@Bean
public Consumer<Model> rssConsumer(final Handler handler) {
return model -> {
if (model != null) {
LOGGER.info("Received url: '{}' ", model.getUrl());
handler.process(model.getUrl());
}
};
}
I see i have recieved 752 url and it reaches to webClient call but when i see subscribe block i am receving 500 response. I am not understand why i am not getting all url response even i recived 752 message.
Here is my webClient code as well as webClient configuration
public void execute(final String url) {
webClient.get().uri(url)
.exchangeToMono(clientResponse -> clientResponse.bodyToMono(String.class)
.flatMap(responseBody -> Mono
.just(new Model(responseBody, clientResponse.statusCode().value()))))
.onErrorResume(exception -> Mono
.just(new Model(exception.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR.value())))
.subscribe(Model -> {
// save data in file
});
}
@Bean
public HttpClient httpClient() {
return HttpClient.create().option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000)
.responseTimeout(Duration.ofMillis(30000))
.doOnConnected(conn -> conn.addHandlerLast(new ReadTimeoutHandler(30000, TimeUnit.MILLISECONDS))
.addHandlerLast(new WriteTimeoutHandler(30000, TimeUnit.MILLISECONDS)));
}
@Bean
public WebClient webClient(HttpClient httpClient) {
return WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient)).build();
}
Schedulers.onHandleError
changed between reactor-core
versions 3.3.5
and 3.4.12
? We were on Spring Boot 2.3.x (and reactor 3.3.5) before and had a Spring Integration Test that threw an error from one of the dependencies used in the reactor chain. The test expected the Schedulers.onHandleError
method to handle the error but after upgrading to Spring 2.6.1 (and reactor 3.4.12), the test has started failing. Upon debugging, I see that Hooks.onErrorDropped
is receiving the error. Wanted to know your thoughts on this. Thanks!
Hi folks. I noticed interesting thing today. Consider following example:
val source = Flux.concat(
Mono.fromCallable { "one" },
Mono.fromCallable { "two" },
Mono.fromCallable { "three" },
)
source
.timeout(Duration.ofSeconds(5), Flux.defer {
logger.trace("Stopping job after 5 seconds of inactivity.")
Flux.empty()
})
.subscribe()
Turned out Mono.fromCallable { "three" }
may never get to materialize if source
was cancelled by timeout() downstream. Can I do something about it? My goal is to have finalizing Flux() at the end of the source that must emit some event.
Hey folks. Curious if you could advise me on my use-case. I have a pool of workers that upload files to a remote server. It's basically Flux that is derived from Sinks.Many<Job>
which I convert to a parallel one with predefined parallelism. My goal is to upload all files as fast as possible, not upload each file as fast as possible. You can assume a remote server has unlimited scalability (AWS S3).
My dilemma is that I need to find a balance between the number of files I upload simultaneously and throughput. If parallelism is too low I won't utilize bandwidth fully. If parallelism is too high (= too many chunks being uploaded simultaneously) the throughput of each upload is too low which may cause timeout disconnection.
The question to you is can I have dynamic parallelism in ParallelFlux
so I can change number of file being uploaded on the fly ?
Hi folk, I have a situation where I am using a reactive Redis lettuce.
I want to execute the MSET command and at the same set expiry in a different thread. here is my code
My question is using publishOn with Schedulers single is good or overkill.
As I have seen mostly when we use blocking code we can use publishOn or SubscribeOn Operator. Is there another operator I can use?
I want to execute expireKeysWithCatch separate then and do not want to wait for the result
inline fun <reified T : Any> putMany(keyValuePair: Map<RedisKey, T>): Mono<Boolean> {
if (keyValuePair.keys.isEmpty()) return true.toMono()
val serializedKeyValuePair = mutableMapOf<String, String>()
keyValuePair.forEach { (redisKey, value) ->
serializedKeyValuePair[redisKey.value] = objectMapper.serialize(value)
}
return reactiveRedisTemplate.opsForValue().multiSet(serializedKeyValuePair)
.publishOn(Schedulers.single())
.flatMap { Mono.fromCallable { expireKeysWithCatch(keyValuePair.keys) } }
.flatMap { it }
}
fun expireKeysWithCatch(redisKeys: Iterable<RedisKey>): Mono<Boolean> {
val redisKeysFlux = Flux.fromIterable(redisKeys)
return redisKeysFlux.flatMap { redisKey -> expireKey(redisKey) }
.then(Mono.just(true))
}
Hey guys,
I'm unsure whether to ask this here or via the spring-boot folks. I'm giving it a try but feel free to send me away if this is not the proper channel. I'm trying to set up a reactive spring controller capable of returning a reactive multipart response consisting of e.g. application/json
and application/octet-stream
(application is backed by Spring Boot making use of netty). For now, I have a simple mocked response:
@PostMapping(value = "/multipart")
public MultiValueMap<String, HttpEntity<?>> multipart() {
MultipartBodyBuilder builder = new MultipartBodyBuilder();
// mock response for now
builder.asyncPart("json", Mono.just("{\"motto\": \"fake it til you make it\"}"), String.class);
builder.asyncPart("audio", Flux.empty(), new ParameterizedTypeReference<byte[]>() {});
return builder.build();
}
This does not work though as I'm greeted with:
class org.springframework.util.LinkedMultiValueMap cannot be cast to class org.springframework.http.codec.multipart.Part (org.springframework.util.LinkedMultiValueMap and org.springframework.http.codec.multipart.Part are in unnamed module of loader 'app')
Could you guys please give me some pointers on what I am doing wrong here?
Has anyone built a reactive component that works with a persistent queue? Something that can handle "any amount" of pushed objects from upstream where the downstream has limited capacity. My application is a server that receives unbounded input messages with a backend database of finite capacity. Given that the input messages are serializable, we can publish them to a chronicle queue with an immediate "ack" to the client. The backend then subscribes to the queue and pulls as fast as it can.
I can write the individual publish and subscribe components easily. And each side of the queue is easy to make reactive. But the "cleanest" approach would be to integrate this into the reactive pipeline.
sin el creo que reactor no existiera. todos le debemos mucho a el.
Condolencias a su familia.
We have an application built on top of the spring cloud stream. We are processing around 6000 message which is URLs that we have to process. Here is my code snippet
//HttpService
public Mono<ResponseModel> download(final String url) {
return webClient.get().uri(url)
.exchangeToMono(clientResponse -> clientResponse.toEntity(String.class)
.map(responseEntity -> new ResponseModel(responseEntity.getBody(), responseEntity.getStatusCode().value(),
CollectionUtils.toMultiValueMap(responseEntity.getHeaders().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))))));
}
// Broker consumer ( Using spring cloud stream)
@Bean
public Consumer<Model> consumer() {
return model -> {
download(model.getUrl())
.onErrorResume(exception -> Mono.just(new ResponseModel(exception.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR.value())))
.subscribe(responseModel -> {
// Some java code processing
//db level check
// db save entity
// file save
});
};
}
So the consumer() is the entry point for getting message from broker to process data. So Consumer is blocking but internally
we are trying to use non-blocking WebClient with subscribe(...). The subscribe(...) will contain blocking code as mentioned above like some java code processing, DB operation and finally save it to file. is it the correct way to do that like WebClient is written in a non-blocking way and under subscribe(..) block there is blocking calls or is there any better way to do such things? Thank you