Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
mplain
@mplain
i need a minute to process and try to understand that...
you're saying that as soon as I send a server response, the body disappears, and I won't be able to process it?
so I need to receive the body first, then send the server response, then process the body and make an async call to the database?
if so, then the correct way to process this request would be... something like this?
@Bean
fun audit(
    repository: AuditEventRepository
) = router {
    POST("/audit") { request ->
        request.bodyToMono<AuditEvent>()
            .doOnNext {
                Mono.fromCallable { repository.save(it) }
                    .retryWhen(Retry.backoff(3, Duration.ofMinutes(1)))
                    .subscribeOn(Schedulers.elastic())
                    .subscribe(
                        { logger.info("Saved audit entity: $it") },
                        { logger.error("Error saving audit entity", it) }
                    )
            }
            .flatMap { ServerResponse.ok().build() }
    }
}
Violeta Georgieva
@violetagg
@mplain Well you have this from the RFC https://tools.ietf.org/html/rfc7230#section-6.3
A server MUST read
   the entire request message body or close the connection after sending
   its response, since otherwise the remaining data on a persistent
   connection would be misinterpreted as the next request.
mplain
@mplain
interesting, thank you
i'm still not sure what's the right way to do an async task inside a publisher though =_=
Violeta Georgieva
@violetagg
instead of request can’t you ask Spring Framework to inject the body?
Mmm you are using functional ...
mplain
@mplain
by inject you mean @RequestBody ?
Violeta Georgieva
@violetagg
yes
mplain
@mplain
i'd rather like to understand better how reactor works
i do not fully understand this:
@adrianhj88 Calling subscribe within a reactive operator does not give any guarantee about doSomething being finished before the operator is done. In this case it might not be a big deal because you're doing Mono.just. But even if you're aware of that limitation, it could behave in strange ways during tests: let's say a test checks the behavior of your reactive pipeline. The JVM might exit before doSomething is done, since nothing is waiting for its completion signal.
Because of that, and because newcomers tend to call subscribe instead of chaining operations with .then(), IntelliJ IDEA is treating that as a warning (and it was our advice to do so).
basically, let's pretend that I don't need the request body at all
@Bean
fun audit(
    repository: AuditEventRepository
) = router {
    POST("/audit") { request ->
        Mono.defer { Mono.just(repository.save("hello")) }
            .subscribeOn(Schedulers.elastic())
            .subscribe()
        ServerResponse.ok().build()
    }
}
this would still give the warning Calling 'Subscribe' in non-blocking scope
Brian Clozel
@bclozel
Assuming your repository implementation is blocking (and that would be why you're trying to offload that to another scheduler):
@Bean
fun audit(
    repository: AuditEventRepository
) = router {
    POST("/audit") { request ->
        Mono.fromCallable(() -> repository.save("hello"))
            .subscribeOn(Schedulers.elastic())
            .then(ServerResponse.ok().build());
    }
}
Brian Clozel
@bclozel
Now back to the Calling subscribe within a reactive operator... explanation, let's put it another way. When a request comes in, if you call subscribe() on something consuming the request body, and on the other hand if you return the response independently - it would be like, in the Servlet world, returning the response and yet kicking off work in a new thread that's not monitored/managed by anything. If your app shuts down before the work is done, everything is lost. If that job is operating on the request data directly and the memory associated with the request is "recycled" by the server, your job will faill on invalid data.
mplain
@mplain
@bclozel thank you!!
Oemer Genc
@oemergenc

Hi, currently i need some help with GroupedFlux. What i want is to group messages by id and then process a specific number of messages for each id, let's say 3 messages for each id.
I have the following code:

        Flux.fromIterable(messages)
                .groupBy(m -> m.key)
                .concatMap { it.collectList() }
                .flatMap { Flux.fromIterable(it.take(3)) }
                .subscribe { println(it.key) }

This works, however if the grouped fluxes are containing a lot of messages, the collectList() takes very long.
As i am only interested in 3 messages of each group this seems to be unnecessary but i cannot find a proper way to prevent this.
Something like ....concatMap { it.take(3).collectlist() } always results in the following error:

Although QueueSubscription extends Queue it is purely internal and only guarantees support for poll/clear/size/isEmpty. Instances shouldn't be used/exposed as Queue outside of Reactor operators.

Anyone an idea how this can be done?

Leonard Brünings
@leonard84

Hi, I have this code to calculate a regularly updating status as hot producer.

Flux.concat(initialAggregatedStatus(),
                Flux.interval(Duration.ZERO, properties.getUpdateInterval())
                        .onBackpressureDrop(intervalCount -> {
                            log.warn("Too many connections pending, dropped status update request {}", intervalCount);
                            droppedUpdateCounter.increment();
                        })
                        .concatMap(ignore -> updateStatus()))
                .subscribeOn(Schedulers.parallel())
                .cache(1)

what is the correct way to stop this? takeWhile(ignore -> running) or .takeUntilOther(killSwitch) with private DirectProcessor<String> killSwitch = DirectProcessor.create();

Flux.concat(initialAggregatedStatus(),
                Flux.interval(Duration.ZERO, properties.getUpdateInterval())
                        .onBackpressureDrop(intervalCount -> {
                            log.warn("Too many connections pending, dropped status update request {}", intervalCount);
                            droppedUpdateCounter.increment();
                        })
                        .concatMap(ignore -> updateStatus()))
                .takeWhile(ignore -> running)
                .subscribeOn(Schedulers.parallel())
                .cache(1);

Or something else?

2 replies
Jørgen Ringen
@JorgenRingen
Using reactor-kafka, is there any way to continue on deserialization errors (kind of like log-and-continue-exception-handler in kafka-streams)?
Currently getting r.k.r.i.DefaultKafkaReceiver$PollEvent: Unexpected exception and it makes the whole "pipeline" fail.
1 reply
Alwyn Schoeman
@alwyn

Hi, I'm looking for the correct pattern to do the following:

I need to do 3 or more operations against an external system. The inputs and outputs of each operation is different from one another AND subsequent operations depend on outputs of previous operations

E.g. A -> B -> C -> D

Each following step checks a response code of the previous step.
If not successful it should not continue with the rest of the processing, but return the response of the previous operation.
If successful, it should use data from the previous response and perform the next action.

I am trying to avoid the scenario where I have the following nesting (pseudocode):

doA.flatMap { r ->
           if (r.code != 0) return@flatMap Mono.just(r)
           doB.flatMap { r ->
               if (r.code != 0) return@flatMap Mono.just(r)
               doC.flatMap {r ->
                   if (r.code != 0) return @flatMap Mono.just(r)
etc, etc, etc, etc
1 reply
Alex Mrynsky
@amrynsky
looking for a "reactive" way to work with piped input/output streams to compress data using GZIPOutputStream. Any hint is appreciated
Tomas Krasny
@krasnyt
Hi, can someone explain, why the following code doesn't block when sending 11th item? According to the doc the EmitterProcessor (if it has no subscribers) buffers the incoming items up to a bufferSize (which I set to 10). The following onNext should block (until it's drained) but it doesn't. Instead it blocks on inserting 17th element. Why it successfully inserts 16 elements? Why 16? v3.3.10.RELEASE.
public static void main(String[] args) {
    EmitterProcessor<Integer> processor = EmitterProcessor.create(10);

    for (int i = 1; i <= 20; i++) {
        processor.onNext(i);
        System.out.println("sent: " + i);
    }

    processor.subscribe(System.out::println);
}
Giorgi Andriadze
@andriadze
Hi, I want to understand how big of a performance impact Mono.delay() has. I may have to create around 10k Mono-s with random delay of couple of secconds. Is that safe, or will that create a lot of threads? Sorry if it's a silly question I'm new to this.
2 replies
Sagar Raj
@SagarrajRaj_twitter
Hello, I am building a reactive pipeline, I read from a sqs queue and process it within the main pipeline. The data is grouped and the group flux is subscribed. In my onNext, I am receiving a grouped flux. I process the content of the flux and pass it on to another consumer. As the subscription here is happening within Onnext, I am guessing I should be creating new instance of the consumer that auto-wiring right? Looking for some guidance on this :)
sushobhitrajan
@sushobhitrajan
Hi Team - I have a use case where I need to Run WebClient on Parallel Calls for 10 Upstream systems and timeout is 450ms, few of the Upstream System gives result in 80-150ms as p99 latency and few takes around ~300ms. Here I need to collect data from each and then do the ranking. I need a suggestion here If I use Collect() and then go for Ranking will that be blocking the thread by any chance as I need to get the data from all first and then go for Ranking. Can someone explain this thread flow here, thanks in advance.
Chakradhar Kasturi
@chakas

Hi, I want help in understanding wether we can maintain trace/span id's in the entire pipeline.

For e.g.,

Flux.just(1,2,3)
            .map(i -> i * 2)
            .filter(i -> i > 3)
            .subscribe(System.out::println);

I want to send signal all the way to the down stream kind of unique-ids (span/trace ids) for each element emitted.I am guessing it becomes tricky in case of flatMaps or schedulers?

I tried using Hooks and MutableContext?
I don't want to change the existing pipeline thats why I thought Hooks are best for my problem.Any idea is highly appreciated..

1 reply
Sagar Raj
@SagarrajRaj_twitter
Hi, I am looking recommendation on (groupBy & subscribe) vs (flux multicast and filter) to take different action on different event types on the flux. Could someone please help @simonbasle
mplain
@mplain

hello!
I need to take an object, send it to a server (using a WebClient), log the response, then do some work with it

Mono.just(something)
   .doOnNext(this::sendAsyncAndLogResponse)
   .doOnNext(this::doSomeWork)
   .subscribe()

Problem is I don't see the server response in the logs
I assume I should not be calling WebClient.exchange() inside a .doOnNext()

What would be the correct way to do this?

1 reply
Sagar Raj
@SagarrajRaj_twitter

Hi, I am looking recommendation on (groupBy & subscribe) vs (flux multicast and filter) to take different action on different event types on the flux. Could someone please help?
https://stackoverflow.com/questions/61814106/choice-between-groupby-and-filter-with-infinite-streams-in-reactor

I am starting with reactive development and I believe the approach is share the flux and filter them in each of the pipelines. I understand that the thread count would be lower, but would this not increase the CPU as computations are lot higher. (I would use a switch case in the regular programming model)

Sagar Raj
@SagarrajRaj_twitter

The incoming messages are of the type

 "messageType": "lightconfig",
  "state": "on"/"off",
  "deviceId": "A0001"
}

I guess there would be like 15-20 categories eventually. So the common part is retrieving the device details. After that, I see 2 options,
Do the common part of the pipeline and share the observable. Then each subscriber can listen to this. Say there are 20 observers and 100 events. We would be running the filter computation 2000 times.
Do the common part of the pipeline, use group-by to group the observable and subscribe the grouped observable with an observer. the observer will receive the Map<messageType,Observable<Message>>.

5 replies
Amardeep Singh Khera
@amardeepkhera

Hi, I am using reactor.retry.Retry from io.projectreactor.addons:reactor-extra:3.3.4.RELEASE lib to include retries in my reactive chain.Everthing works as expected, but when I run againist blockhound I get a blocking exception as follows.Is there any recommended way to fix this, or is it a bug in the library itself.

at java.io.FileInputStream.readBytes(FileInputStream.java)
at java.io.FileInputStream.read(FileInputStream.java:255)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at java.io.DataInputStream.readFully(DataInputStream.java:195)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at java.time.zone.TzdbZoneRulesProvider.load(TzdbZoneRulesProvider.java:187)
at java.time.zone.TzdbZoneRulesProvider.<init>(TzdbZoneRulesProvider.java:113)
at java.time.zone.ZoneRulesProvider$1.run(ZoneRulesProvider.java:157)
at java.security.AccessController.doPrivileged(Native Method)
at java.time.zone.ZoneRulesProvider.<clinit>(ZoneRulesProvider.java:144)
at java.time.ZoneRegion.ofId(ZoneRegion.java:120)
at java.time.ZoneId.of(ZoneId.java:411)
at java.time.ZoneId.of(ZoneId.java:359)
at java.time.ZoneId.of(ZoneId.java:315)
at java.util.TimeZone.toZoneId(TimeZone.java:556)
at java.time.ZoneId.systemDefault(ZoneId.java:274)
at reactor.scheduler.clock.SchedulerClock.of(SchedulerClock.java:166)
at reactor.retry.AbstractRetry.<init>(AbstractRetry.java:66)
at reactor.retry.DefaultRetry.<init>(DefaultRetry.java:48)
at reactor.retry.DefaultRetry.create(DefaultRetry.java:58)
at reactor.retry.Retry.allBut(Retry.java:107)

iron2414
@iron2414

Hi, i'm using the spring framework org.springframework.web.reactive.socket.WebSocketSession, and i create a UnicastProcessor reactor.core.publisher.UnicastProcessor to send messages. LIke this:

publisher = UnicastProcessor.create();
session.send(publisher).subscribe();
 // [...] stuff
 WebSocketMessage message = new WebSocketMessage(WebSocketMessage.Type.BINARY, dataBuffer);
                    publisher.onNext(message);

It is working perfectly, however i'm curious what happens, if i produce too many messages, and my network can't keep up with the sent messages.

  • How do i detect such a thing?
    • What happens to my application by default?
    • What can i do if that happens? Should implement some kind of backpressure?

I tried to search on google, but no luck, so any help is appreciated

naveendahiya
@naveendahiya

for reactivegridfs: It seems to run out of memory. I am not able to apply backpressure...any hints pls
for (Map<String, Object> metadata : mdList){saveObject(dataBuffer, metadata)
.onErrorContinue((e, a)-> log.error(e.getMessage()))
.subscribe(new BackpressureReadySubscriber());}

public class BackpressureReadySubscriber<T> extends BaseSubscriber<T> {

int limit =100;
int factor = limit;
int delay = 5000;
int consumed;

@Override
protected void hookOnError(Throwable throwable) {
    log.error(throwable.getMessage());
}

@Override
public void hookOnSubscribe(Subscription subscription) {
   // log.info("Size of request"+ factor);
    // initial request, but why it is called everytime
    //further request data
    request(factor);
}

@Override
@SneakyThrows
public void hookOnNext(T value) {
    consumed++;
    log.info("Consumed"+consumed);
    if (consumed == limit) {
        consumed = 0;
        log.info("Sleeping for "+delay/1000+" sec");
        Thread.sleep(delay);
        log.info("woke up after "+delay/1000+" sec");
        request(factor);
    }
}

}

Erik Lumme
@lummememe_twitter
I saw a discussion from a year or two back about a way to set some default Context values for all publisher chains. This is what I came up with, not sure if I'm terribly ruining everything by doing so:
Hooks.onLastOperator("test-hook", publisher -> {
    if (publisher instanceof Mono) {
        return ((Mono<Object>) publisher).subscriberContext(SomeClass::initContext);
    }
    return ((Flux<Object>) publisher).subscriberContext(SomeClass::initContext);
});
pragnareddye
@pragnareddye

Hello! I am using Kafka reactor. I consume batch of messages from Kafka and store it in DB asynchronously and the operation returns completableFutures.
This is what I have so far:

Flux<K> flux = receiver.receiveAtmostOnce()
        flux
            .bufferTimeout(maxBatchSize, batchTimeout)
            .onBackpressureBuffer()
            .publishOn(Schedulers.boundedElastic())
            .doOnError(th -> {
                LOG.warn("Exception while handling records", th);
                latch.countDown();
            })
            .doOnCancel(() -> LOG.info("Kafka receiver stopped"))
            .subcribe(records ->
                                completetableFutureArray = processRecords(records)  
                               //Blocking call that I want to avoid//
                                completetableFutureArray.join() 
             )

I understand using a blocking call is bad but

  1. If I don't block, how will back pressure work? Won't the subscriber continuously fetch more items before finishing processing of the existing items?
  2. Mine is a slow consumer fast producer problem and that is why back pressure is important.

Any help/suggestion would be appreciated

natraj09
@natraj09
I have created a chain of fluxA.transform(performTaskA).transform(performTaskB).transform(persistData).transform(performTaskC) is there a way to preserve context of output of performTaskA and make it available in performTaskC ?
1 reply
aurokrish21
@aurokrish21

need help to read the field value from ServiceRequest object and this object hold only "Part.class" file content.

when i iterate the "partFile" object, it doesn't hold the Form-fields values.

public Mono<ServerResponse> getUploadFiles(ServerRequest request){

        return request.multipartData()
                .flux()
                .flatMap(map -> Flux.fromIterable(map.values()))
                .collectList()
                .flatMap((partFile) -> ServerResponse.ok()
                                                     .body(Mono.just(fileStorageService.storeZipFile(partFile.get(0), "", "")), String.class))
                .onErrorResume(
                        JsonProcessingException.class,
                        (e) -> ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
                                             .contentType(MediaType.APPLICATION_JSON)
                                             .body(Mono.just(e.getMessage()), String.class));

    }
along with files we are sending from fields value, example : name: XYZ, date: 2020-10-13
how do i read those form fields, Could you please anyone help me on this
Prakhar Tandon
@tondon.prakhar.mountblue_gitlab
need help to convert a Mono<PojoType> to simply obtain PojoType object without using block() in http-reactor-epol thread
naveendahiya
@naveendahiya
How does backpressure works? Is Thread.sleep() needed to slow down the consumer?
Marcus Rodan
@Nerja

Hello, I'm trying to understand how gRPC Client Cancellation works when using https://github.com/salesforce/reactive-grpc. I have an example program that creates a simple service, something like the following:

@Slf4j
@GrpcService
public class Service extends ReactorServiceGrpc.ServiceImplBase {
  @Override
  public Mono<User> getUser(Mono<GetUserRequest> requestMono) {
    return requestMono
            .doOnNext(__-> log.info("Got a request"))
            .thenReturn(User.newBuilder().build())
            .delayElement(Duration.ofSeconds(5))
            .doOnTerminate(() -> log.info("Terminated"));
  }
}

I expect that I will see get "Terminated" shortly after "Got a request!" if I cancel the call from the client. I, however, see the following output:

2020-10-25 16:17:09.633 [ault-executor-0] : Got request!
2020-10-25 16:17:14.634 [     parallel-2] : Terminated
2020-10-25 16:17:14.637 [     parallel-2] r.c.p.Operators : Operator called default onErrorDropped

io.grpc.StatusRuntimeException: CANCELLED: call already cancelled

Any ideas on why this happens and why I don't get the expected behavior?

Sagar Raj
@SagarrajRaj_twitter
Hello,
What is the right way to handle backpressure in async subscribers?
Eugene Kamenev
@eugene-kamenev

Hi! Dear reactor team, I cannot understand why this test case does not pass?

        when:
        //def scheduler = VirtualTimeScheduler.getOrSet();
        def rp = Sinks.many().replay().limit(Duration.ofSeconds(1))
        def flux = rp.asFlux().doOnNext({
            log.info it.toString()
        })

        for (int i = 0; i < 5; i++) {
            rp.emitNext(i, Sinks.EmitFailureHandler.FAIL_FAST);
        }
        Thread.sleep(2000L)
        // scheduler.advanceTimeBy(Duration.ofSeconds(2))
        for (int i = 5; i < 10; i++) {
            rp.emitNext(i, Sinks.EmitFailureHandler.FAIL_FAST);
        }

        then:
        StepVerifier.create(flux)
                .expectNext(5,6,7,8,9)
                .thenCancel()
                .verify()

This test case passes only if I uncomment virual scheduler.

isuru-sam
@isuru-sam
HI why not having ServerResponse<Flux> in fuctional controllers?