Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Steve Storck
@Steve973
Hello. If I have a flux or a mono, and I need to use the finite data that it receives as a parameter in multiple following fluxes, how can I compose them to do that? I prefer not to block.
Steve Storck
@Steve973
Or, I should say, not to specifically block, and not on the main thread.
Nuwan Sanjeewa Abeysiriwardana
@nuwansa

@KATKrazy

Hi,
I'm trying to figure out how to ensure thread is finished when I use publishOn;
Flux.just(o)
.doOnNext(System.out::println)
.publishOn(Schedulers.single())
.doOnNext(o -> process(o))
.subscribe();
I was wondering how to make sure the process function is finished when I use subscribe() instead of block()

another doOnNext after .doOnNext(o -> process(o)) would do it. isn't it?

vsaji
@vsaji
Hi. I have a scenario whereby I need to make an async DB call in the Mono Pipeline. What is the right choice Mono.fromFuture or Mono.subscribeOn(Scheduler.xxx) ? Pleas help
4 replies
kitkars
@kitkars
Hi Team,
What is the difference between FluxOperator vs reactive-streams Processor? Flux operator behaves like a processor. but it is not.
harrisrahman
@harrisrahman
Hi, I have a very plain Webflux Controller, and a GET method that takes just query param and returns a Mono<String> . The method just returns a "Hello" and there is no other logic there. When I. tried performance testing this Webflux server using Gatling, the performance was very poor
image.png

Here are netty startup logs

2021-01-02 22:34:37.861 DEBUG 29069 --- [ main] r.netty.resources.DefaultLoopEpoll : Default Epoll support : false
2021-01-02 22:34:37.863 DEBUG 29069 --- [ main] r.netty.resources.DefaultLoopKQueue : Default KQueue support : false
2021-01-02 22:34:37.871 DEBUG 29069 --- [ main] i.n.channel.MultithreadEventLoopGroup : -Dio.netty.eventLoopThreads: 24
2021-01-02 22:34:37.899 DEBUG 29069 --- [ main] i.n.u.internal.InternalThreadLocalMap : -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
2021-01-02 22:34:37.899 DEBUG 29069 --- [ main] i.n.u.internal.InternalThreadLocalMap : -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
2021-01-02 22:34:37.908 DEBUG 29069 --- [ main] io.netty.channel.nio.NioEventLoop : -Dio.netty.noKeySetOptimization: false
2021-01-02 22:34:37.908 DEBUG 29069 --- [ main] io.netty.channel.nio.NioEventLoop : -Dio.netty.selectorAutoRebuildThreshold: 512
2021-01-02 22:34:37.919 DEBUG 29069 --- [ main] i.netty.util.internal.PlatformDependent : org.jctools-core.MpscChunkedArrayQueue: available
2021-01-02 22:34:37.987 DEBUG 29069 --- [ main] io.netty.util.ResourceLeakDetector : -Dio.netty.leakDetection.level: simple
2021-01-02 22:34:37.987 DEBUG 29069 --- [ main] io.netty.util.ResourceLeakDetector : -Dio.netty.leakDetection.targetRecords: 4
2021-01-02 22:34:38.124 DEBUG 29069 --- [ main] io.netty.channel.DefaultChannelId : -Dio.netty.processId: 29069 (auto-detected)
2021-01-02 22:34:38.130 DEBUG 29069 --- [ main] io.netty.channel.DefaultChannelId : -Dio.netty.machineId: 38:f9:d3:ff:fe:22:64:e8 (auto-detected)
2021-01-02 22:34:38.166 DEBUG 29069 --- [ main] io.netty.buffer.PooledByteBufAllocator : -Dio.netty.allocator.numHeapArenas: 24
2021-01-02 22:34:38.166 DEBUG 29069 --- [ main] io.netty.buffer.PooledByteBufAllocator : -Dio.netty.allocator.numDirectArenas: 24
2021-01-02 22:34:38.166 DEBUG 29069 --- [ main] io.netty.buffer.PooledByteBufAllocator : -Dio.netty.allocator.pageSize: 8192
2021-01-02 22:34:38.166 DEBUG 29069 --- [ main] io.netty.buffer.PooledByteBufAllocator : -Dio.netty.allocator.maxOrder: 11
2021-01-02 22:34:38.167 DEBUG 29069 --- [ main] io.netty.buffer.PooledByteBufAllocator : -Dio.netty.allocator.chunkSize: 16777216
2021-01-02 22:34:38.167 DEBUG 29069 --- [ main] io.netty.buffer.PooledByteBufAllocator : -Dio.netty.allocator.tinyCacheSize: 512
2021-01-02 22:34:38.167 DEBUG 29069 --- [ main] io.netty.buffer.PooledByteBufAllocator : -Dio.netty.allocator.smallCacheSize: 256
2021-01-02 22:34:38.167 DEBUG 29069 --- [ main] io.netty.buffer.PooledByteBufAllocator : -Dio.netty.allocator.normalCacheSize: 64
2021-01-02 22:34:38.167 DEBUG 29069 --- [ main] io.netty.buffer.PooledByteBufAllocator : -Dio.netty.allocator.maxCachedBufferCapacity: 32768
2021-01-02 22:34:38.167 DEBUG 29069 --- [ main] io.netty.buffer.PooledByteBufAllocator : -Dio.netty.allocator.cacheTrimInterval: 8192
2021-01-02 22:34:38.167 DEBUG 29069 --- [ main] io.netty.buffer.PooledByteBufAllocator : -Dio.netty.allocator.cacheTrimIntervalMillis: 0
2021-01-02 22:34:38.167 DEBUG 29069 --- [ main] io.netty.buffer.PooledByteBufAllocator : -Dio.netty.allocator.useCacheForAllThreads: true
2021-01-02 22:34:38.167 DEBUG 29069 --- [ main] io.netty.buffer.PooledByteBufAllocator : -Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023
2021-01-02 22:34:38.175 DEBUG 29069 --- [ main] io.netty.buffer.ByteBufUtil : -Dio.netty.allocator.type: pooled
2021-01-02 22:34:38.175 DEBUG 29069 --- [ main] io.netty.buffer.ByteBufUtil : -Dio.netty.threadLocalDirectBufferSize: 0
2021-01-02 22:34:38.176 DEBUG 29069 --- [ main] io.netty.buffer.ByteBufUtil : -Dio.netty.maxThreadLocalCharBufferSize: 16384
2021-01-02 22:34:38.203 DEB

harrisrahman
@harrisrahman
Please let me know if I am missing any netty config that causes this slowdown.
saikumard17
@saikumard17
Hi team
I want to access value generated in Mono later in the reactive pipeline
Mono.just(nameService.getName()).map(name -> addressService.getAddress(name)).map(address ->postService.postLetter(name,address))
how can I access name to pass for postLetter, which is generated from nameService without calling it second time ?
4 replies
pandepra
@pandepra
What's the best way to finish processing of ongoing events after calling Disposable.dispose()? I have an application that consumes from Kafka and runs in a container. I want to register a JVM shutdown hook that cancels that subscription and gracefully terminates the stream (terminate it only after processing any ongoing events).
Thiago Queiroz
@thigorqueiroz
Guys, Can you recommend a reference implementation of API Rest using Reactor?
Timur Shaidullin
@timur-sh
Matt Mitchell
@mwmitchell
I have a stream of objects, some of which need to be simply saved in a database, and some of which need to be batched, then POSTed to a REST endpoint. Is there a way to do something like that with reactor? It seems like the items that need to be batched could be a sub-stream or something, but unsure of how to do that at this point.
1 reply
Matt Mitchell
@mwmitchell
Ok next challenge... I have a stream of items. I'm batching the items using bufferTimeout so I can get at least n items in a batch, or as many as possible within the time limit. This is followed by a repeat() so the process continues. I'd like to cancel the subscription if the time since the last non empty batch was greater than x. Is this possible to do without resorting to explicitly implementing my own state management?
Matt Mitchell
@mwmitchell
Is it possible to ensure that all items are processed in order, even if you have multiple subscribers handling different types of values? I have 1 subscriber that does a flatMap and returns a Mono.fromCompletionStage(...), which does some work and blocks, and in the meantime, the producer keeps pushing values and the other subscribers end up finishing before the one that's doing work (blocking) - how can I force the others to wait? I'm "joining" them all using Flux.merge() - is there a better way?
Paul Newman
@fogwolf_gitlab
Hi folks. I have a Reactor Kafka stream processing app and am struggling a bit to get my head around how or if I can write unit tests specifically for the Reactor code. I am able to inject the KafkaReceiver and KafkaSender, so am able to mock these out and inject them, so I can control the lower-level flux of events that will be processed, but what I don't quite understand is how I could use the StepVerifier to test the code running within my processor method, since the processing method doesn't return a Flux for me to plug into the StepVerifier (and KafkaReceiver and KafkaSender are not publishers, so I can't plug the mocks of those into a StepVerifier in my test code). Does that makes sense? I'd like to test things like if onErrorContinue is actually working in my processor and keeps processing even when it encounters an error. Is there any way for me to do this?
1 reply
Dineth Senevirathne
@hdineth
Can someone recommend a deep dive ebook/ tutorial in Spring WebFlux?
Smykk
@Smykk

Hi, @joshlong

I want to build a kind of priority based message exchange mechanism in Reactor. Messages arrive in a form of infinite stream. I'd like messages to have different limitRate() applied depending on the priority they carry. Sum of pressure on the consumer that is generated by all priorities should be constant. I managed to achieve a half-baked solution (see below) with use of window but actually waiting for the completion of window seems to slow down performance.

Three questions here:

  1. Is there a way to conditionally apply different limitrate() without using window and all the modifications around it?
  2. Do you have any advice on testing these parallel processes execution time - is logging accurate enough?
  3. If something feels incredibly wrong here - shoot!
public Flux<ResponseMessage> applyPrio(Flux<RequestMessage> inflow) {
        return inflow.window(Duration.ofMillis(windowDurationMillis))
                .flatMap(windowFlux -> windowFlux
                        .sort(Comparator.comparing(RequestMessage::getPriority))
                        .groupBy(RequestMessage::getPriority)
                        .flatMap(gf -> gf
                                .publishOn(Schedulers.parallel())
                                .onBackpressureBuffer()
                                .flatMap(exchangeFunction::apply)
                                .limitRate(this.priorityToRate.apply(gf.key()), this.priorityToRate.apply(gf.key())))
                        );
    }
kitkars
@kitkars
Hi Guys... hooks, are they gloabl? Is there any way to add hook only to a specific pipeline?
Sergio Bilello
@sbilello
Is there any good book about this?
Not documentation like but explanation of the common pitfalls and concepts
Things that everyone should take care of...
This has been a great reading any other resource to share?
KATKrazy
@KATKrazy

@kitkars
may be you can do this:

Mono.just(X).subscribe(new HookSubscriber());

public class HookSubscriber extends BaseSubscriber {
@Overrite
public void hookOnNext(T value){
//do something
}
}

Mono.just(X).subscribe(new HookSubscriber());

public class HookSubscriber extends BaseSubscriber {
@Overrite
public void hookOnNext(T value){
//do something
}
}
not sure I'm understand you correctly
Sergio Bilello
@sbilello
What's the best way to deal with rate limiting server-side with spring webflux?
I found this one we don't have a library for it?
Daan Kerkhofs
@kerkhofsd

Given a cold TestPublisher which is created with e.g. 100 items available and then completes.
Is there a way to leverage TestPublisher and StepVerifier to assert the number of items requested from this TestPublisher` source?

I see there are the assertMinRequested(min) and assertMaxRequested(max) methods, however my first impression is these indicate the number of "open" requests and is therefore always 0 in my setup with pre-created source.

Example code:

    @Test
    public void test() {
        TestPublisher<Integer> publisher = TestPublisher.createCold();
        Flux.range(1, 100).doOnNext(publisher::emit).subscribe();
        publisher.complete();

        StepVerifier.create(publisher)
                .thenRequest(1)
                .expectNextCount(1)
//                .then(() -> publisher.assertMinRequested(1)) // FAILS
                .thenRequest(10)
                .expectNextCount(10)
//                .then(() -> publisher.assertMinRequested(10)) // FAILS
                .thenRequest(Long.MAX_VALUE)
                .expectNextCount(89)
                .expectComplete()
                .verify();
    }

My goal here is to be able to test that upstream is not consumed in an unbound way when I .transform(...) it with a custom component.

28 replies
Daan Kerkhofs
@kerkhofsd
Is there a way to transform a Flux<List<T>> into a Flux<T>, with respect for backpressure? ("split" / inverse buffer operation)
I've been trying with the flatMap / flatMapIterable / concatMapIterable operations but my testing (see previous question) suggests they all consume the upstream in an unbound way...
flux.concatMapIterable(Function.identity(), 1);
Dominik Jurasz
@Faelivrinx
Is there any documentation/topics about migration from TopicProcessor to API from 3.4.2 version?
Sagar Raj
@SagarrajRaj_twitter
I have metrics configured for my reactive stream in the following manner. I am observing that only the first metrics are getting publish. Could someone help understand if this is expected? Here is the structure of my reactive sequence.
        flux
                .log()
                .flatMap(FluxMetrics::complexMono1)
                .name("pipeline")
                .tag("operation","complexMono1")
                .metrics()
                .flatMap(FluxMetrics::complexMono2)
                .name("pipeline")
                .tag("operation","complexMono2")
                .metrics()
                .flatMap(FluxMetrics::complexMono3)
                .name("pipeline")
                .tag("operation","complexMono2")
                .metrics()
                .subscribe(System.out::println);
Sagar Raj
@SagarrajRaj_twitter
Basically what I was expecting is that I would be able to see the metrics data flowing through each flatmap
Naresh Ankam
@nareshankam
Hello, I have a microservice which takes requests from clients (app1) sends those requests to other microservice (external), and whatever response gets from external microservice same response will be sent to the clients. My microservice acts as a middleman. Neither clients nor external microservices use reactor. Can I use reactor in my microservice without affecting others?
Timur Shaidullin
@timur-sh

Hi.

Could anyone explain what is it "non bocking backpressure"? I've read a lot of topic, but no on explains what is it a blocking backpreassure and non-blocking, especially there was no exmaples of it

3 replies
Naresh Ankam
@nareshankam
Hi,
Does Mono.block() and Flux.block() block the thread (and waiting) along with flow of execution?
Sagar Raj
@SagarrajRaj_twitter

I have metrics configured for my reactive stream in the following manner. I am observing that only the first metrics are getting publish. Could someone help understand if this is expected? Here is the structure of my reactive sequence.

        flux
                .log()
                .flatMap(FluxMetrics::complexMono1)
                .name("pipeline")
                .tag("operation","complexMono1")
                .metrics()
                .flatMap(FluxMetrics::complexMono2)
                .name("pipeline")
                .tag("operation","complexMono2")
                .metrics()
                .flatMap(FluxMetrics::complexMono3)
                .name("pipeline")
                .tag("operation","complexMono2")
                .metrics()
                .subscribe(System.out::println);

Could someone please help with understanding if it is expected to see only the first metrics? My assumption was first metrics after first flatmap gives insight on the first flatmap operator and so on. What would be the right place to have this metrics if not here?

Sagar Raj
@SagarrajRaj_twitter
Hello,
I think having the same name for the metrics lead to only the first one being published.
I could plot the requested vs onNext count. The OnNext count seem to way above request count. Can this happen? The OnNext Count seems to be around 721k while the request count seems to be 480k. Not really sure how this is happening
Could someone please help
Simon Baslé
@simonbasle

Hi,
Does Mono.block() and Flux.block() block the thread (and waiting) along with flow of execution?

yes it does, it turns reactive code into imperative blocking code

@SagarrajRaj_twitter are you really reusing the same name ? in that case, the metrics would be at best mixed together
Simon Baslé
@simonbasle
mmh I think I see what happens @SagarrajRaj_twitter ... there is probably a big limitation on how reactor-core associates tags() with metrics(): it walks the whole line of parents to the metrics operator and finds ALL the above tags, from downmost to topmost. so eg. the third metrics() sees tag values complextMono3 then complextMono2 then complexMono1 and ends up using the later :(
Sagar Raj
@SagarrajRaj_twitter
Thank you for the response. I see, makes sense. When I am reusing the same name, I thought having the a separate tag would result in new metrics with different dimension on cloud watch. Only the first dimension showed up. Its perfectly inline with your explaination :)
Looks like mixup is causing the incorrect information on dashboard.
Sagar Raj
@SagarrajRaj_twitter
I also have a question related to callbacks. Could you please help :)
I am using async http clients which require passing an executor to which the result will be dispatched to. What executor type is recommended here. When I use work stealing, I see the threads getting blocked under high TPS. I am not doing any heavy lifting except for logging (log4j2 async). I used this with the thought that this would be similar to parallel. Should I switch to Fixed executor pool or something similar?
Simon Baslé
@simonbasle
@SagarrajRaj_twitter I think the situation can be improved, though. when the metrics resolves the above tags, it should prioritize values that are set closer to it. opened reactor/reactor-core#2560
Sagar Raj
@SagarrajRaj_twitter
Awesome! Thank you!! 🙏
Sagar Raj
@SagarrajRaj_twitter
Also since I am using sequence of flatmaps, I was thinking if this would be caused by operator fusion. I am new to reactor. Could you help with an insight. :) I want to know how operator fusion comes into picture in the above example and how metrics fit into it.
1 reply