Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
naveendahiya
@naveendahiya

for reactivegridfs: It seems to run out of memory. I am not able to apply backpressure...any hints pls
for (Map<String, Object> metadata : mdList){saveObject(dataBuffer, metadata)
.onErrorContinue((e, a)-> log.error(e.getMessage()))
.subscribe(new BackpressureReadySubscriber());}

public class BackpressureReadySubscriber<T> extends BaseSubscriber<T> {

int limit =100;
int factor = limit;
int delay = 5000;
int consumed;

@Override
protected void hookOnError(Throwable throwable) {
    log.error(throwable.getMessage());
}

@Override
public void hookOnSubscribe(Subscription subscription) {
   // log.info("Size of request"+ factor);
    // initial request, but why it is called everytime
    //further request data
    request(factor);
}

@Override
@SneakyThrows
public void hookOnNext(T value) {
    consumed++;
    log.info("Consumed"+consumed);
    if (consumed == limit) {
        consumed = 0;
        log.info("Sleeping for "+delay/1000+" sec");
        Thread.sleep(delay);
        log.info("woke up after "+delay/1000+" sec");
        request(factor);
    }
}

}

Erik Lumme
@lummememe_twitter
I saw a discussion from a year or two back about a way to set some default Context values for all publisher chains. This is what I came up with, not sure if I'm terribly ruining everything by doing so:
Hooks.onLastOperator("test-hook", publisher -> {
    if (publisher instanceof Mono) {
        return ((Mono<Object>) publisher).subscriberContext(SomeClass::initContext);
    }
    return ((Flux<Object>) publisher).subscriberContext(SomeClass::initContext);
});
pragnareddye
@pragnareddye

Hello! I am using Kafka reactor. I consume batch of messages from Kafka and store it in DB asynchronously and the operation returns completableFutures.
This is what I have so far:

Flux<K> flux = receiver.receiveAtmostOnce()
        flux
            .bufferTimeout(maxBatchSize, batchTimeout)
            .onBackpressureBuffer()
            .publishOn(Schedulers.boundedElastic())
            .doOnError(th -> {
                LOG.warn("Exception while handling records", th);
                latch.countDown();
            })
            .doOnCancel(() -> LOG.info("Kafka receiver stopped"))
            .subcribe(records ->
                                completetableFutureArray = processRecords(records)  
                               //Blocking call that I want to avoid//
                                completetableFutureArray.join() 
             )

I understand using a blocking call is bad but

  1. If I don't block, how will back pressure work? Won't the subscriber continuously fetch more items before finishing processing of the existing items?
  2. Mine is a slow consumer fast producer problem and that is why back pressure is important.

Any help/suggestion would be appreciated

natraj09
@natraj09
I have created a chain of fluxA.transform(performTaskA).transform(performTaskB).transform(persistData).transform(performTaskC) is there a way to preserve context of output of performTaskA and make it available in performTaskC ?
1 reply
aurokrish21
@aurokrish21

need help to read the field value from ServiceRequest object and this object hold only "Part.class" file content.

when i iterate the "partFile" object, it doesn't hold the Form-fields values.

public Mono<ServerResponse> getUploadFiles(ServerRequest request){

        return request.multipartData()
                .flux()
                .flatMap(map -> Flux.fromIterable(map.values()))
                .collectList()
                .flatMap((partFile) -> ServerResponse.ok()
                                                     .body(Mono.just(fileStorageService.storeZipFile(partFile.get(0), "", "")), String.class))
                .onErrorResume(
                        JsonProcessingException.class,
                        (e) -> ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
                                             .contentType(MediaType.APPLICATION_JSON)
                                             .body(Mono.just(e.getMessage()), String.class));

    }
along with files we are sending from fields value, example : name: XYZ, date: 2020-10-13
how do i read those form fields, Could you please anyone help me on this
Prakhar Tandon
@tondon.prakhar.mountblue_gitlab
need help to convert a Mono<PojoType> to simply obtain PojoType object without using block() in http-reactor-epol thread
naveendahiya
@naveendahiya
How does backpressure works? Is Thread.sleep() needed to slow down the consumer?
Marcus Rodan
@Nerja

Hello, I'm trying to understand how gRPC Client Cancellation works when using https://github.com/salesforce/reactive-grpc. I have an example program that creates a simple service, something like the following:

@Slf4j
@GrpcService
public class Service extends ReactorServiceGrpc.ServiceImplBase {
  @Override
  public Mono<User> getUser(Mono<GetUserRequest> requestMono) {
    return requestMono
            .doOnNext(__-> log.info("Got a request"))
            .thenReturn(User.newBuilder().build())
            .delayElement(Duration.ofSeconds(5))
            .doOnTerminate(() -> log.info("Terminated"));
  }
}

I expect that I will see get "Terminated" shortly after "Got a request!" if I cancel the call from the client. I, however, see the following output:

2020-10-25 16:17:09.633 [ault-executor-0] : Got request!
2020-10-25 16:17:14.634 [     parallel-2] : Terminated
2020-10-25 16:17:14.637 [     parallel-2] r.c.p.Operators : Operator called default onErrorDropped

io.grpc.StatusRuntimeException: CANCELLED: call already cancelled

Any ideas on why this happens and why I don't get the expected behavior?

Sagar Raj
@SagarrajRaj_twitter
Hello,
What is the right way to handle backpressure in async subscribers?
Eugene Kamenev
@eugene-kamenev

Hi! Dear reactor team, I cannot understand why this test case does not pass?

        when:
        //def scheduler = VirtualTimeScheduler.getOrSet();
        def rp = Sinks.many().replay().limit(Duration.ofSeconds(1))
        def flux = rp.asFlux().doOnNext({
            log.info it.toString()
        })

        for (int i = 0; i < 5; i++) {
            rp.emitNext(i, Sinks.EmitFailureHandler.FAIL_FAST);
        }
        Thread.sleep(2000L)
        // scheduler.advanceTimeBy(Duration.ofSeconds(2))
        for (int i = 5; i < 10; i++) {
            rp.emitNext(i, Sinks.EmitFailureHandler.FAIL_FAST);
        }

        then:
        StepVerifier.create(flux)
                .expectNext(5,6,7,8,9)
                .thenCancel()
                .verify()

This test case passes only if I uncomment virual scheduler.

isuru-sam
@isuru-sam
HI why not having ServerResponse<Flux> in fuctional controllers?
gtay003
@gtay003
Hi team, I'm looking for something that works a bit like Flux.sample(Duration), but which emits an element immediately if no previous element was received in the preceding sample window, and then resets the sample window from that point. As far as I can see, the Flux.sample(Duration) operator will always wait until the current sample window expires before emitting, even if nothing was emitted in the previous window. Does something like this already exist? I suspect it might be possible to implement what I need with the Flux.sample(Publisher) method but I'm not sure how to approach it, so any pointers here would be much appreciated. Thanks!
Omid Barat
@obarat
How can I make Map<String, Mono<String> to Mono<Map<String, String>>?
1 reply
bruto1
@bruto1
FluxElapsed operator has a single field as its state (lastTime) and it's not volatile. It's written to in onSubscribe and onNext methods which are guaranteed to be invoked sequentially, but not necessarily from the same thread. Where exactly is the memory barrier which ensures correctness of reads and writes to this field?
mplain
@mplain

Hello!
I've discovered something strange

Server side:

        GET("/flux") {
            val flux = Flux.range(0, 5)
//                    .map(Int::toString)
                    .delayElements(Duration.ofSeconds(1))
            ServerResponse.ok().body(flux)
        }

Client side:

        WebClient.create("http://localhost:8080")
                .get()
                .uri("/flux")
                .exchange()
                .doOnNext { logger.info(it.statusCode().toString()) }
                .doOnNext { it.headers().asHttpHeaders().forEach { logger.info("${it.key}: ${it.value}") } }
                .flatMapMany { it.bodyToFlux<String>() }
                .subscribe(logger::info)

If the server returns a flux of integers,
then response content-type is set to application/json,
client waits for 5 seconds, then receives the response and body together
body is an array of integers

2020-11-01 23:44:00  - GET /flux
2020-11-01 23:44:06  - 200 OK
2020-11-01 23:44:06  - transfer-encoding: [chunked]
2020-11-01 23:44:06  - Content-Type: [application/json]
2020-11-01 23:44:06  - [0,1,2,3,4]

If the server returns a flux of strings,
then response content-type is set to text/plain,
client receives response immediately, then waits for 5 seconds, and receives the response body
body is a concatenated string

2020-11-01 23:45:07  - GET /flux
2020-11-01 23:45:08  - 200 OK
2020-11-01 23:45:08  - transfer-encoding: [chunked]
2020-11-01 23:45:08  - Content-Type: [text/plain;charset=UTF-8]
2020-11-01 23:45:12  - 01234
Why this difference in behavior?
if the server returns a flux of strings, if I manually set content-type to application/json, it makes no difference as the client still receives response first, the body 5 seconds after, as a concatenated string
mplain
@mplain
what do i need to do to make it return an pretty array of strings ?
1 reply
mplain
@mplain
and what's the principal difference between returning a Flux<String> and a Mono<List<String>>?
(provided I don't intend to use content-type application/stream+json or text/event-stream)
2 replies
iron2414
@iron2414

Hi there, i'm trying to implement a customQueue for a UnicastProcessor inside a Spring application, to handle bursts (i just want to drop the incoming message, if there is too many). My problem is, if i do

UnicastProcessor<WebSocketMessage> publisher = UnicastProcessor.create(myCustomqueue);

then my application totaly freezes overtime, and NOTHING works. there are no more logs, or anything like that. However if i create a UnicastProcessor with 0 parameter in the constructor, then everything is working fine.

Here's my custom queue implementation, it's really simple:

public class LimitedQueue<E> extends ArrayDeque<E> {

    private final int limit;

    public LimitedQueue(int limit) {

        this.limit = limit;

    }

    @Override
    public boolean offer(E o) {
        if (size() >= limit) {
            return false;
        }
        return super.offer(o);
    }

}

I found a really similiar question from 8 months ago here as well:
https://stackoverflow.com/questions/60193240/reactor-unicastprocessor-drop-on-backpressure-when-queue-is-full

Could anyone help me out?

1 reply
Fırat Sivrikaya
@firatsivrikaya
Hey everyone, I have just started to learn spring reactor, been reading the reference document but I am having hard time understanding the concepts and finding the reference a little bit complex for a beginner like me. Also I couldnt find real world examples on the internet, can you please suggest me some resources for better understanding reactor and webflux? All examples that I see on the internet are mostly contains creating a flux and then subscribing to it. Maybe someone can share open source real world projects with tests and I could dive into them. I am losing a lot of motivation here :)
Violeta Georgieva
@violetagg
@firatsivrikaya What exactly do you need. There are a lot of resource in the web. Josh Long has a lot of videos that you can check. For example https://youtu.be/_LR0Cxnn-kw?t=1649
Mikael Elm
@mickeelm
Also, if you haven't had that approach already - read the reference patiently. Don't skip parts or rush through it. IMO then it is a great resource. But I agree, there are many good videos out there from conference talks where introductions to webflux/reactor are covered
iron2414
@iron2414

Hi everyone, I'm trying reactor Kafka (I'm basicaly a newbie at it), and made the samples work. However the consumer keeps spamming the following logs:

2020-11-04 10:01:36.007+0100 DEBUG 3856 --- [-sample-group-1] o.a.k.c.FetchSessionHandler              : [Consumer clientId=sample-consumer, groupId=sample-group] Node 0 sent an incremental fetch response for session 569816377 with 0 response partition(s), 1 implied partition(s)
2020-11-04 10:01:36.008+0100 DEBUG 3856 --- [-sample-group-1] o.a.k.c.c.i.Fetcher                      : [Consumer clientId=sample-consumer, groupId=sample-group] Added READ_UNCOMMITTED fetch request for partition test2-0 at position FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=abbelfynsrv:9092 (id: 0 rack: null), epoch=0}} to node abbelfynsrv:9092 (id: 0 rack: null)
2020-11-04 10:01:36.008+0100 DEBUG 3856 --- [-sample-group-1] o.a.k.c.FetchSessionHandler              : [Consumer clientId=sample-consumer, groupId=sample-group] Built incremental fetch (sessionId=569816377, epoch=35) for node 0. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
2020-11-04 10:01:36.008+0100 DEBUG 3856 --- [-sample-group-1] o.a.k.c.c.i.Fetcher                      : [Consumer clientId=sample-consumer, groupId=sample-group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(test2-0)) to broker abbelfynsrv:9092 (id: 0 rack: null)

Is there a way to disable this? I can show my consumer's code if needed.

Jens Geiregat
@jgrgt

Hi all,

I'm developing a Spring-Boot Reactor based application that we're currently performance testing. The basic data flow is:

  1. Fetch a lot of data from the DB using r2dbc-postgresql
  2. Do some heavy calculations on that data
  3. Return an aggregation of that data

Now, we're having issues. When the system is under load, our DB reports responding withing < 200ms is most cases, while a lot of our trace data shows that we only receive the first result from the DB in the code after ~1s. And I have no clue where that time difference is coming from.

Some things I have tried:

  • moving the cpu heavy calculations to the Parallel scheduler
  • even added artificial buffering to the stream to be able to measure throughput, or avoid backpressure problems, but still, the DB code seems extremely slow.

The only thing left for me to try is to throw out the r2dbc-postgresql repository and replace it with plain old jdbc and just schedule that on a boundedElastic scheduler?

Any idea is welcome...

ChandanPadalkar
@ChandanPadalkar
Hi Guys,
I am developing application using Spring webflux and cassandra as database. I have use case where I want to sort data based on particular field, fetch top 10 records and display on UI. Due to cassandra limitations I cannot do directly through database query.
I have tried to fetch all data from table and sort data on Java layer and then return top 10 records to UI. Though I have millions of records in table, fetching all data and do sorting on Java layer takes significant amount of time.
Is there any better approach to achieve the same on Java layer?
8 replies
Knut Schleßelmann
@kschlesselmann

I've got a library which parses a lot of XML data and where I have to provide some callback what should be done with the parsed data. Now I try to incorporate this lib in my reactive application like this

private fun InputStream.parse(): Flux<ParsedProduct> = Flux.create { sink ->

    val requested = AtomicLong(0)

    sink.onRequest {
        if (logger.isTraceEnabled) {
            logger.trace("Requested ParsedProducts", kv("number", it))
        }

        requested.addAndGet(it)
    }

    val parser = XmlArchiveParser(ProductParser()) {
        if (logger.isTraceEnabled) {
            logger.trace("Parsed product", kv("productId", it.productId))
        }

        while (requested.get() <= 0) {
            logger.trace { "Waiting for requests" }
            Thread.sleep(5)
        }

        sink.next(it)
        requested.decrementAndGet()
    }

    parser.importAndProcessProducts(this)
    sink.complete()
}

Is this a proper way to implement this hybrid push/pull backpressure mechanism? If so which Thread gets blocked currently by the Thread.sleep? Should I subscribe this stuff on boundedElastic()?

mplain
@mplain
I want to have two router endpoints, one POST that receives some data and sends it to a "channel", and the other GET that sends out everything from that channel as Server-Sent Events
I've seen such channels in Go
Is there something similar in Reactor?
3 replies
mplain
@mplain

I'm using Spring WebClient (based afaik on reactor-netty-httpclient?)
I'm sending a stream or 5 values with a one second delay (Flux<Object>, content-type = application/x-ndjson) to the server, and receiving the response as a Flux as well
I expect to see it the logs something like this:

object #1 send
response #1 received
object #2 send
response #2 received
...

Objects are sent one-by-one, and responses are also received one-by-one
but WebClient starts receiving the response only after it finishes to send the request
I've found a similar question on SO: https://stackoverflow.com/questions/52113235
In it, @bclozel says that Spring WebClient and Reactor Netty HttpClient only start processing the response after they're finished processing the request
Also, @violetagg told me that if the Netty server sends a response before it it finished processing the request body, then the remaining request body is discarded (?), as per the HTTP spec (?)
Could you please confirm that this is the way WebFlux and Reactor Netty work by design?

Interestingly, another answer on SO says that Jetty-based WebClient works differently, it is able to start receiving the response before finishing to process the request
Is the restriction on the client side, or server side, or both?

Violeta Georgieva
@violetagg

@mplain

Also, @violetagg told me that if the Netty server sends a response before it it finished processing the request body, then the remaining request body is discarded (?), as per the HTTP spec (?)

And this is true if you finalise the response and not if you start writing the response

For example this code will start echoing the received incoming data and will continue to do it untill the last message from the client is received
HttpServer server =
        HttpServer.create()
                  .port(PORT)
                  .route(r -> r.post("/echo",
                          (req, res) -> res.header(CONTENT_TYPE, TEXT_PLAIN)
                                           .send(req.receive().retain())));
Violeta Georgieva
@violetagg
And what @bclozel relpied in the mentioned SO question is so true - there is no guarantee that the sequance above that you expect will happen espetially nowadays with so many proxies and loadbalancers between the client and the server

then the remaining request body is discarded (?), as per the HTTP spec (?)

https://tools.ietf.org/html/rfc7230#section-6.3

A server MUST read
   the entire request message body or close the connection after sending
   its response, since otherwise the remaining data on a persistent
   connection would be misinterpreted as the next request.
mplain
@mplain
thank you!
I'm trying out reactor-kafka with blockhound
blockhound barks at KafkaConsumer.poll being a blocking method
I've found a github issue about it: reactor/reactor-kafka#63
should I move that flux to another scheduler to make the dog happy, or is there not point really?
11 replies
Knut Schleßelmann
@kschlesselmann
I have a publisher that only gets executed if I have acquired a Lock beforehand. After execution the Lock has to be released. Now doFinally sadly executes after the completion of the Publisher so there is no way (that I know of) that I can test this behavior. What would you suggest how I should handle this?
11 replies
Jeevjyot Singh Chhabda
@jeevjyot
Hi,
I have a List<String> and Mono<Map<String,Object>>. I want to look each entry of List<String> in Mono<Map<String,Object>> . What is the right way to achieve this in reactive programming?
3 replies
Rahul Narula
@ranarula

Hi, I have a handler method in which I need to persist data in DB and once persisted, I need to send out some events (http call reactive webclient) with the persisted data. I want to return the control back once the DB operation is completed. The sending of event parts need to happen as a background job. I am having issue assembling the Mono operator chain for such. Here is something what I have

Mono<ServerResponse> handleRequest(ServerRequest request) {
    var persistedMono = Mono.just("Hello")
        .flatMap(this::persistData)
        .doOnError(this::sendErrorEvent);

    persistedMono.publishOn(Schedulers.parallel())
        .doOnSuccess(this::sendEvents).subscribe();

    return persistedMono.flatMap(result -> ServerResponse.ok()
        .body(BodyInserters.fromValue(result)));
  }

The problem here is the subscription for persistMono happens twice and persistData is called twice.

How can I address this issue and send the events in the background.

Thanks in advance for any suggestions/solution

9 replies
rbrose
@rbrose
How i can solve this with only 1 SELECT Query?
    public Flux<TopicCommentResponse> topicComments(String language, long topicId) {
        return databaseClient.execute("" +
                "SELECT tc.*, u.username FROM topic_comment tc JOIN users u on tc.user_id = u.id WHERE tc.topic_id=:topicId AND tc.parent_id is null order by tc.created_at desc")
                .bind("topicId", topicId)
                .as(TopicCommentResponse.class)
                .fetch()
                .all()
                .flatMap(comment ->
                     Mono.just(comment)
                            .zipWith(databaseClient.execute("" +
                                    "SELECT tc.*, u.username FROM topic_comment tc JOIN users u on tc.user_id = u.id WHERE tc.topic_id=:topicId AND tc.parent_id=:parentId order by tc.created_at desc")
                                    .bind("topicId", topicId)
                                    .bind("parentId", comment.getId())
                                    .as(TopicCommentResponse.class)
                                    .fetch()
                                    .all().collectList())
                            .map(tupla ->  tupla.getT1().withAnswers(tupla.getT2()))
                );
    }
Dan Cohen-Smith
@dancohensmith
hey i see a new reactor kafka
does this mean the back pressure bug is fixed?
6 replies
does it actually respect backpressure now?
R2J
@rajeev-jha1988
did Spring R2DBC provide the support for jpaSpecification query
Mritunjay Dubey
@mddubey

Hello people!

I am trying to implement reactive rabbit-mq in my spring webflux application, however I am facing issues. Following the tutorial from reactor rabbitmq reference guide, this is what I am trying to do:

sender.send(Mono.just(message))
                    .doOnSuccess(a-> System.out.println("Delivered"))
                    .doOnError(e -> {
                        logger.error("error", e);
                    })
                    .subscribe()

Please let us know what might be going wrong! Have not been able to find a solution over internet

1 reply
Rob Gillen
@borgille
Hi everyone! I'm trying to wrap my head around something that is probably simple, but I haven't found a way to do it, and maybe there is a better way than I am thinking of. I would like to collect the values emitted by a Flux into a PriorityQueue to order them before emitting them to downstream subscribers. This seems like it could be achieved through a custom Processor, but there seems to be a consensus discouraging that. If not via custom Processor, then how?
12 replies
vireshwali
@vireshwali

Hi everyone. I am new to spring reactor and need some help implementing a simple use case. So i have two monos (mono1 and mono2) of same type Mono<List<Type1>> ....Type 1 is something like this

public class ErrorCountsByShortNameDto implements Serializable {
    private String errorShortName;
    private String deploymentName;
    private Long errorCount;
}

Now i need to loop thee list in the 2 monos and match the Type1 items whose errorShortName is same. Can anyone help me point in the right direction or any older message here that does this.

vireshwali
@vireshwali
any help for ^^ one?