Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Dmitry
@dshuplyakov
Hi there!
I want to read a message and then execute a http request. If http request 2хх, ack message, otherwise - nack. But the problem is flatMap returns Mono<ResponseEntity> and I can’t pass to map original amp message.
RabbitFlux.createReceiver().consumeManualAck(QUEUE_NAME, consumeOptions)
        .flatMap(amqpMessage -> {
            String url = new String(amqpMessage.getBody());
            return webclient.get().uri(url).retrieve().toBodilessEntity();
        })
       .map(responseEntity -> {
           if (responseEntity.getStatusCode().is2xxSuccessful()) {
               // how to pass here amqpMessage?
               amqpMessage.ack();    
           } else {
               amqpMessage.nack();
           }
           return amqpMessage;
        }).subscribe();
1 reply
harshdave840
@harshdave840
Hi everyone ..I have 2 hot flux which are running in parallel and one mono. I want mono to be always executed after both the flux have completed. I tried to concat first flux and mono but mono executes immediately after first flux has completed.How do i make sure that mono always runs after both the parallel flux have completed.Below is just an example.

var sourceFlux= Flux.range(1,10);
var hotflux1=sourceFlux.publish().refCount(2);
var firstFlux=hotflux1.map(i->i*2).log("First");

//this is a io blocking call
var secondFlux=hotflux1
.publishOn(Schedulers.boundedElastic()).map(i->i*5).log("Second");
secondFlux.subscribe();
//

var firstMono=Flux.just(300).log("First Mono");
var concatedFlux=Flux.concat(firstFlux,firstMono);
concatedFlux.subscribe();

3 replies
In real first flux is calling some service and second flux is db call and first mono is also service call. Mono requires data from both first flux and second flux before calling its service.
Ben Tomasini
@btomasini

I am using an approach similar to this to lift a reactor context into ThreadLocal for use by a grpc client interceptor.

https://stackoverflow.com/questions/47501349/using-hooks-and-a-lift-to-push-context-into-threadlocal

I am getting some stale values due to some bugs in the way I am using it. My main concern with this approach is its dependence on the onNext call to lift a fresh context into the ThreadLocal. If used incorrectly - namely not doing a contextWrite on a stream, it can lead to leaked values from other threads.

One way I thought of to add more safety would be to clear the ThreadLocal in the onComplete or onError of the subscriber. From my initial inspection it appears this works because, so far, the onComplete and onError is invoked on the same thread as onNext.

Question: Is it guaranteed that onComplete and onError is called on the same thread as onNext for a given subscriber like this?

1 reply
L.J.
@liujingbreak

Is there any exhaustMap operator on reactor core ?

I have same question, came across how API doc of Reactor 3, can't find equivalent operator, google shows me this channel and conversation

Marek Andreansky
@skyhirider
Can you recommend a well written open source library that utilizes Reactor? I would like to read through some real code to see how I could utilize Reactor.
Salathiel Genèse
@SalathielGenese
Spring Webflux !?
Netty ?
John Meehan
@n0rb3rt
Looking for a way to chunk a Flux<ByteBuffer> into specified chunk sizes in bytes. Does such a solution already exist somewhere?
bruto1
@bruto1

Looking for a way to chunk a Flux<ByteBuffer> into specified chunk sizes in bytes. Does such a solution already exist somewhere?

concatMap and external state for buffer remainders less than specified size?

John Meehan
@n0rb3rt
That's the path I'm headed down. Coming from Akka streams and Scala, keeping external state seemed like the wrong approach. But I see from other examples that's not out of the norm here. Something like?
    public static Flux<ByteBuffer> chunker(Flux<ByteBuffer> data, int chunkSize) {
        return Flux.defer(() -> {
            AtomicLong countDown = new AtomicLong(chunkSize);
            return data.bufferUntil(buf -> {
                if (countDown.addAndGet(-buf.remaining()) <= 0L) {
                    countDown.set(chunkSize);
                    return true;
                }
                return false;
            }).concatMap(bufList -> {
                int size = bufList.stream().mapToInt(ByteBuffer::remaining).sum();
                return Mono.just(bufList.stream().reduce(ByteBuffer.allocate(size), ByteBuffer::put));
            });
        });
    }
4 replies
kerr
@hepin1989
Hi, how to implemente throttling in reactor?
Daniel Wahlqvist
@offroff
Can I do a nested groupBy? I have equipments and for each equipment I have different metrics. So I would like to do a groupBy for each equipment and then an inner groupBy for each type of metric. That might be the wrong approach of course but I'm mostly trying to get started and learn
2 replies
josh gruenberg
@joshng

is there a straightforward way to achieve a graceful flush/shutdown with an infinite source? I have a Flux from a KafkaReceiver, and at shutdown, I want to stop receiving messages from upstream, but allow all of the in-flight work to complete, and then allow the KafkaReceiver to commit its offsets (via the provided ReceiverOffset.acknowledge callback), before the KafkaReceiver closes its underlying kafka consumer. dispose seems to propagate upstream immediately, canceling all buffered work... and it also induces the KafkaConsumer to be closed before the offsets associated with that work can be committed.

I think this might naturally take the form of arranging for the request signals flowing upstream to the KafkaReceiver to be disabled at shutdown time, but I haven't found a simple way to inject that behavior into the wiring. I'm using a pretty hacky solution that involves publish to a ConnectableFlux, with a bogus consumer that stops requesting ... but this seems pretty heavyweight and unnatural. Any suggestions?

3 replies
Zakir
@Zaky7

Hi everyone, I have been using Mono.zip a lot in my application and since I am from nodejs background, I was thinking it works like promise.all but I found recently In mono.zip if one of the mono returns empty it won't run

replication

fun main() {
    val m1 = Mono.just("A")
    val m2 = Mono.just("B")
    val m3 = Mono.empty<String>()

    Mono.zip(m1,m2,m3).map {
        println("Data t1: ${it.t1} t2: ${it.t2} t3: ${it.t3}")
    }.subscribe()
}

It dont prints anything

workaround

fun main() {
    val m1 = Mono.just("A").map { Optional.of(it) }.defaultIfEmpty(Optional.empty())
    val m2 = Mono.just("B").map { Optional.of(it) }.defaultIfEmpty(Optional.empty())
    val m3 = Mono.empty<String>().map { Optional.of(it) }.defaultIfEmpty(Optional.empty())

    Mono.zip(m1,m2,m3).map {
        println("Data t1: ${it.t1.get()} t2: ${it.t2.get()} t3: ${it.t3.orElse(null)}")
    }.subscribe()
}

My requirement is I have a couple of mono and I need output of all of them even one is empty. In case any of the mono fails then it can throw an error.

@OlegDokuka. any view regarding this buddy

1 reply
Salathiel Genèse
@SalathielGenese
I think you're doing it right, Zakir. Since null value will result in an empty mono
3 replies
Daniel Wahlqvist
@offroff
I have a flux and I want to dynamically tell it how to transform the data. My idea is to join the flux with another "command flux" and adapt the processing depending on the last command. Is this idea similar to a real and tested pattern someone else have used? I suspect you guys can give me some hints. What I need to do is to figure out how to implement this and I need to validate if it's a good pattern at all.
3 replies
Daniel Wahlqvist
@offroff
I've been reading the API documentation and feel a little overwhelmed but also impressed and curious. Now I'm wondering; what is Reactor? Sure, I get it's a reactive programming framework but what I haven't anticipated is what such a framework must evolve into, if it's successful. I didn't realize this before but now I believe a successful reactive programming framework MUST in the long run evolve into a successful stream processing framework. That might be a strange claim, and I know that's not what many people believe. E.g. this attitude is common: https://www.tikalk.com/posts/2017/12/06/streaming-vs-reactive/ Given all this; what is Reactor and what is it evolving into?
Salathiel Genèse
@SalathielGenese
That article just sparks confusion
The term “stream” [...] is a generic term that applies to Java Streams, Observables and even Iterators.
1 reply
Kirill Marchuk
@62mkv
Hi all. I am a bit confused with Gradle Metadata. In the .pom artifact from here: https://repo1.maven.org/maven2/io/projectreactor/reactor-core/3.3.5.RELEASE/, it says "published-with-gradle-metadata". but the folder for this release (same URL) does not contain any .module file. How come?
SiwyDym
@SiwyDym
Hi! I have a problem with using ExchangeFilterFunction in WebClient. I declared a filter that runs backoff retry on error, but for WebClientResponseExceptiontype it doesn't work, retry mechanism not run. Unlike WebClientRequestException for which is working. What I do wrong?
//creating client 
var client = WebClient.builder()
               .defaultHeader(CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .baseUrl(serviceUrl)
                .filter(decorateFunction)
                .build();
(...)
//example of using
webClient.delete().uri(deleteUrl).retrieve()
            .bodyToMono(XXX.class)
            .doOnSuccess(consumeSuccess())
            .doOnError(consumeError())
            .subscribe();
(...)
//`decorateFunction` is a like below 
public interface ReactiveRequestDecorator extends ExchangeFilterFunction {

    /**
     * Decorate the given request.
     *
     * @param request the request to decorate
     * @param <T> the type of result returned
     * @return decorated request
     */
    <T> Mono<T> decorate(Mono<T> request);

    @Override
    default Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
        return decorate(next.exchange(request));
    }

}

// example implementation
(...)
@Override
public <T> Mono<T> decorate(Mono<T> request) {
        return Mono.defer(() -> request)
                .retryWhen(Retry.backoff(3, Duration.ofMillis(200));
    }
(...)
1 reply
Patrick Gotthard
@PatrickGotthard

Hi all. Is there any difference regarding memory usage when using share() in the following code?

final Path file = [...];
final Flux<DataBuffer> buffer = this.webClient.get().uri(url).retrieve().bodyToFlux(DataBuffer.class).retry(3);

// Option 1 without calling share()
DataBufferUtils.write(buffer, file, StandardOpenOption.CREATE).block();

// Option 2 with calling share()
DataBufferUtils.write(buffer, file, StandardOpenOption.CREATE).share().block();

My application has to download a lot of files and I want to use as less RAM as possible.

Zakir
@Zaky7

hi all, Anyone has used Flow, Coroutine, and Webflux together in production. I want to know how is the experience. I have a spring weblfux application, I want to convert it to flow and webflux. The benefit would be imperative looking at asynchronous code

https://spring.io/blog/2019/04/12/going-reactive-with-spring-coroutines-and-kotlin-flow

4 replies
Ghenadii Batalski
@ghenadiibatalski

Hello all, i need to investigate the subscription cancellation cause on a webflux application. How can i log/determine the cause of a cancel event. Following code is called from the RestController:

 Mono.fromCallable(() -> {
            // do some blocking stuff
            log.info("creating some value");
            return someValue;
        })
    .doOnNext(p -> log.info("someValue created: {}", p))
        .doOnError(throwable -> log.error("Something went wrong on someValue creation", throwable))
        .doOnCancel(() -> log.error("cancelled on something ..."))

and produces following log:

"message":"| onSubscribe([Fuseable] FluxOnAssembly.OnAssemblyConditionalSubscriber)","logger_name":"reactor.Mono.OnAssembly.224","thread_name":"nioEventLoopGroup-3-2"
"message":"| request(unbounded)","logger_name":"reactor.Mono.OnAssembly.224","thread_name":"nioEventLoopGroup-3-2"
"message":"onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)","logger_name":"reactor.Mono.CallableOnAssembly.223","thread_name":"nioEventLoopGroup-3-2"        
"message":"request(unbounded)","logger_name":"reactor.Mono.CallableOnAssembly.223","thread_name":"nioEventLoopGroup-3-2"
"message":"| cancel()","logger_name":"reactor.Mono.OnAssembly.224","thread_name":"reactor-http-epoll-4",
"message":"cancel()","logger_name":"reactor.Mono.CallableOnAssembly.223","thread_name":"reactor-http-epoll-4",
"message":"cancelled on something ...","logger_name":"domain.Service","thread_name":"reactor-http-epoll-4"
"message":"creating some value","logger_name":"domain.Service2", "thread_name":"nioEventLoopGroup-3-2"

any help would be appreciated! Regards, Gena

8 replies
karanbir8080
@karanbir8080
Hi , I had a doubt regarding how Spring WebClient works in a typical spring MVC application. I learnt that spring cloud gateway cannot run on mvc as it relies on spring webflux and that we should not mix runtime models like netty and tomcat . So my question was how can webclient run efficiently in spring mvc application ?
12 replies
bgrooot
@bgrooot

Hello. I was reading the code for EmitterProcessor and wanted to ask a question.

Why do we use loop for synchronization in add / remove methods?

If 'SUBSCRIBERS' is saved in Map structure, it seems that 'System.arraycopy' is not called every time in the add method. And in the remove method, it seems that it is not necessary to find the element while traversing the array.

Maybe there is a reason I didn't think of. If you let me know, I'd appreciate it.

4 replies
Alexey Anufriev
@alexey-anufriev

Hello,

I want to build a system where multiple subscribers can subscribe to an individual events, something like different queues in message broker, and a publisher that just sends all messages to a single pile.

What I am trying to understand is how to organize this routing between subscribers so each consumes only messages addressed to this concrete consumer.

If that may help, I am using reactive spring.

Raghu
@code-uri

Hello,

I am using a unicast sink like this. Incase of NON_SERIALIZED emits, emitNext() method is blocking till the consumeItem gets Completed. Why is this happening?

private final Sinks.Many<String> inputSink = Sinks.many().unicast().onBackpressureBuffer(queue);

//On publisher TheadPool A
public Mono<Void> produceItem(String item) {
        log.info("Emit Next");
       //emitNext is blocking on NON_SERIALIZED emits. why??
        inputSink.emitNext(item, getEmitFailureHandler());
        log.info("Emit Done");
        return Mono.empty();
    }

//subscriber Thread Pool B
 public Flux<String> consumeItem() {
        return inputSink.asFlux()
                .publishOn(Schedulers.boundedElastic())
                .flatMap(...);
}
1 reply
RichMacDonald
@RichMacDonald

I am trying to stress test a client-server system by submitting 1000 messages from the client, each as a Mono, but doing 10 simultaneously. My code is:

Flux.range(0, 1000)  //send a total of 1000 messages
    .flatMap(i -> sendMessage())
    .limitRate(10)   //10 at a time
    .collectList()
    .block();

But it isn't working. Instead, the client initially submits 256 simultaneously.

The FluxFlatMap.FlatMapMain.onSubscribe(Subscription s) method uses:

s.request(Operators.unboundedOrPrefetch(maxConcurrency));

And maxConcurrency is 256.

It doesn't matter what values I use in limitRate, e.g., limitRate(2,1)

This is caused by the flatMap method, which I call with default concurrency and prefetch:

public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper) {
    return flatMap(mapper, Queues.SMALL_BUFFER_SIZE, Queues
            .XS_BUFFER_SIZE);
}

I tried moving the limitRate() method in front of flatMap but get the same behavior.

So limitRate doesn't do anything, and the code needs to be:

Flux.range(0, 1000)
    .flatMap(i -> sendMessage(), 10, 10)
    //.limitRate(10) has no effect
    .collectList()
    .block();

Anyone have an explanation?

ruslan
@unoexperto

Folks, I just noticed that following code in my main data stream causes significant performance degradation (~2X)

.timeout(inactivityTimeout, Flux.defer {
    logger.trace("Stopping job after $inactivityTimeout of inactivity.")
    Flux.empty()
})

Async-profiler shows that significant amount of time is spent on java/util/concurrent/locks/LockSupport.unpark method. What am I missing here?

14 replies
Mantas Matūzas
@mantoshelis
Hello, we started to get WebClient exception reactor.netty.http.client.PrematureCloseException: Connection prematurely closed BEFORE response
Using wiretap: true does not show anything suspicious.
Error is thrown randomly. We are using httpClient.secure for configuring mutual authentication.
Is there anything that can be done?
2 replies
Tobi
@tobq

I recently switched my app over from webmvc to webflux, and tried to configure this

@Configuration
@EnableWebFlux
public class WebConfig implements WebFluxConfigurer {

    @Override
    public void addCorsMappings(CorsRegistry registry) {
        registry.addMapping("/**")
                .allowedOrigins("*");
    }

    @Bean
    CorsWebFilter corsWebFilter() {
        var corsConfig = new CorsConfiguration();
        corsConfig.setAllowedOrigins(List.of("*"));

        var source = new UrlBasedCorsConfigurationSource();
        source.registerCorsConfiguration("/**", corsConfig);

        return new CorsWebFilter(source);
    }
}

and also tried adding @CrossOrigin to my controller.

But I'm still not getting the cross-origin response headers.

Anyone know the fix?

2 replies
Zane XiaoYe
@Zane-XY
is there a way to pause Flux.generate() if backpressure is triggered?
Graham Pople
@programmatix

Hi :wave:. I'm an engineer with Couchbase, and I'm struggling with adding full thread-safety to our transactions library that is based around reactor. For some context, here is a sample transaction:

transactions.reactive().run((ctx) -> {

    // This chain is specified by the user, saying what they want to happen in the transaction.
    return Flux.fromIterable(ids)
            .parallel()
            .runOn(Schedulers.elastic())

            // User is doing concurrent INSERTs transactionally
            .flatMap(id -> ctx.query("INSERT..."))
            .then();
}).subscribe();

The ctx.query() internally needs to take a mutex lock for part of the work. (I’ve written a reactive-compatible mutex that keeps a list of lock-waiters - Sinks).

The problem is if one of those concurrent query operations fails. I understand that a stream can only raise one error. How this seems to be implemented is that if an error is raised past the concatMap, then all the remaining concurrent query operations simply stop at the end of their current operator - no error or complete signal is raised on them. This understanding is based on trial-and-error - am I correct on this?

This is causing me some real issues as I need those concurrent query ops to do some tidyup before dying - namely they have to remove themselves from the mutex (e.g. unlocking it if they have the lock, removing themselves from the list of waiters if they don’t). Otherwise subsequent rollback, which also needs the lock, deadlocks.

On non-query operations I use a workaround of waiting for the other ops to complete, before propagating the error (and also reject any new operations). But this doesn’t work for query, as some query errors are potentially not fatal to the transaction, and need to be raised out of the ctx.query() to give the user the chance to ignore it.

I hope that makes sense, let me know if there’s anything I need to clarify.

So - I’m not sure exactly what my request is. If I had a way to do some work just before an operation dies, that would work for me. Or perhaps there is an alternative way of doing things that I’m missing here - I feel this must be a relatively common situation, e.g. concurrent ops needing to do cleanup if one of them fails. Please let me know if you have any ideas.

Graham Pople
@programmatix
Nb I've since discovered doOnCancel and ErrorMode/delayErrors after putting a bunch of breakpoints in the reactive code. Exploring those now.
KrishnaRao Veeramachaneni
@OpenSourceTycoon
Hi Team, i have usecase to consume million kafka ( reactor-kafka )messages & call REST Api in parallel ( TPS :120 ) with multiple instances of java app.. I'm using same code as mentioned - https://stackoverflow.com/questions/54126263/invoking-non-blocking-operations-sequentially-while-consuming-from-a-flux-includ?rq=1 ..but it's call API Sequentially .. not parallelly..
not sure ..how to solve the puzzle.. Can someone suggest on the same..
thanks in advance
i tested kafka part with 1 million.. able to consume in 10 mins..
when i integrate the API part it;s taking 10 mins for 1000 messages
KrishnaRao Veeramachaneni
@OpenSourceTycoon
Any advice ?
Patrick Hahn
@xorph
We have two clients (fast) where we retrieve some data and a repository (slow) where the results are stored. We tried to limit the requests based on repository performance with limitRate. We see far more requests than expected. Right now our service is dying with out of memory, because the repository is too slow. What do we miss here? Any Advice?
clientA.retrieveFoos()
            .flatMap { foo -> clientB.retrieveBars(foo) }
            .flatMap { someSlowRepository.save(it) }
            .limitRate(10)
3 replies
Salman khandu
@salman-khandu
Does spring web-client support multiple redirects? and how many redirect it support?
2 replies
Salman khandu
@salman-khandu

Issue with spring webclient subscribe(...)

I have setup spring cloud stream application. Here is my consumer spring function

@Bean
    public Consumer<Model> rssConsumer(final Handler handler) {
        return model -> {
            if (model != null) {
                LOGGER.info("Received  url: '{}' ", model.getUrl());
                handler.process(model.getUrl());
            }
        };
    }

I see i have recieved 752 url and it reaches to webClient call but when i see subscribe block i am receving 500 response. I am not understand why i am not getting all url response even i recived 752 message.
Here is my webClient code as well as webClient configuration

public void execute(final String url) {
        webClient.get().uri(url)
                .exchangeToMono(clientResponse -> clientResponse.bodyToMono(String.class)
                        .flatMap(responseBody -> Mono
                                .just(new Model(responseBody, clientResponse.statusCode().value()))))
                .onErrorResume(exception -> Mono
                        .just(new Model(exception.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR.value())))
                .subscribe(Model -> {
                        // save data in file
                });

}
    @Bean
    public HttpClient httpClient() {
        return HttpClient.create().option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000)
                .responseTimeout(Duration.ofMillis(30000))
                .doOnConnected(conn -> conn.addHandlerLast(new ReadTimeoutHandler(30000, TimeUnit.MILLISECONDS))
                        .addHandlerLast(new WriteTimeoutHandler(30000, TimeUnit.MILLISECONDS)));
    }
    @Bean
    public WebClient webClient(HttpClient httpClient) {
        return WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient)).build();
    }
2 replies
Deepak Bhimaraju
@deepak-auto
Hi there, has the behaviour for Schedulers.onHandleError changed between reactor-core versions 3.3.5 and 3.4.12? We were on Spring Boot 2.3.x (and reactor 3.3.5) before and had a Spring Integration Test that threw an error from one of the dependencies used in the reactor chain. The test expected the Schedulers.onHandleError method to handle the error but after upgrading to Spring 2.6.1 (and reactor 3.4.12), the test has started failing. Upon debugging, I see that Hooks.onErrorDropped is receiving the error. Wanted to know your thoughts on this. Thanks!
ruslan
@unoexperto

Hi folks. I noticed interesting thing today. Consider following example:

val source = Flux.concat(
    Mono.fromCallable { "one" },
    Mono.fromCallable { "two" },
    Mono.fromCallable { "three" },
)

source
    .timeout(Duration.ofSeconds(5), Flux.defer {
        logger.trace("Stopping job after 5 seconds of inactivity.")
        Flux.empty()
    })
    .subscribe()

Turned out Mono.fromCallable { "three" } may never get to materialize if source was cancelled by timeout() downstream. Can I do something about it? My goal is to have finalizing Flux() at the end of the source that must emit some event.

5 replies
Roman Bykovskiy
@romanesko
Hello guys! Can't understand how to buffer output to use it in batches. Here is the question and example: https://stackoverflow.com/questions/70083756/project-reactor-buffer-with-parallel-execution
1 reply
ruslan
@unoexperto

Hey folks. Curious if you could advise me on my use-case. I have a pool of workers that upload files to a remote server. It's basically Flux that is derived from Sinks.Many<Job> which I convert to a parallel one with predefined parallelism. My goal is to upload all files as fast as possible, not upload each file as fast as possible. You can assume a remote server has unlimited scalability (AWS S3).

My dilemma is that I need to find a balance between the number of files I upload simultaneously and throughput. If parallelism is too low I won't utilize bandwidth fully. If parallelism is too high (= too many chunks being uploaded simultaneously) the throughput of each upload is too low which may cause timeout disconnection.

The question to you is can I have dynamic parallelism in ParallelFlux so I can change number of file being uploaded on the fly ?

1 reply