Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
sjuyal12
@sjuyal12
I am getting the "Multiple subscribers are not supported for KafkaReceiver flux" error when trying to run my application on 2 different browsers
1 reply
@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
    return webSocketSession.send(kafkaReceiver.receiveAutoAck()
            .concatMap(r -> r)
            .map(record -> {
                return record.value();
            })
            .map(webSocketSession::textMessage))
            .and(webSocketSession.receive()
                    .map(WebSocketMessage::getPayloadAsText)
                    .map(this::sendToKafka)
                    .log());
}
Oemer Genc
@oemergenc

Hi, i am trying to read a specific number of messages from each assigned partition of a consumer. For some reason i cannot get it correctly, here is my code:

    fun receive(
        topic: String,
        consumerConfig: Map<String, String>
    ): Flux<ReceiverRecord<String, String>> {
        val scheduler = Schedulers.newBoundedElastic(60, Integer.MAX_VALUE, "sample", 60, true)
        val receiverOptions = ReceiverOptions.create<String, String>(consumerConfig)
            .subscription(mutableListOf(topic))
            .commitInterval(Duration.ZERO)
        return KafkaReceiver.create(receiverOptions).receive()
            .groupBy { m -> m.receiverOffset().topicPartition() }
            .flatMap {
                it.publishOn(scheduler)
                    .map { r -> r }
                    .concatMap { record ->
                        record.receiverOffset().commit()
                        record.toMono()
                    }
                    .take(10)
            }
    }

What happens is that 10 messages from 2 partitions are read, but no other. The cluster and topic has 40 partitions. For some reason not all partitions are processed.
Removing the take(10) results in all messages from all partitions to be read. So i guess the basic idea seems to be correct. Any ideas?

Eduardo Cucharro
@eduardocucharro_twitter
Hello Guys, is there a tool or way to check if a memory leak is happening inside an app using reactor-netty?
37 replies
Eduardo Cucharro
@eduardocucharro_twitter
image.png
image.png

image.png

@violetagg

Eduardo Cucharro
@eduardocucharro_twitter
image.png

image.png

@violetagg

mplain
@mplain
hi there
Hi folks. Does anyone have an explanation for why the following example gives the warning (in Intellij) Calling 'Subscribe' in non-blocking scope only for flatMap and not for map?
Mono<String> doSomething = Mono.just("doSomething");

Mono.just("doIt")
    .map(string -> {
        doSomething.subscribe();
        return string;
    })
    .flatMap(string -> {
        doSomething.subscribe();
        return Mono.just(string);
    });
@adrianhj88 Calling subscribe within a reactive operator does not give any guarantee about doSomething being finished before the operator is done. In this case it might not be a big deal because you're doing Mono.just. But even if you're aware of that limitation, it could behave in strange ways during tests: let's say a test checks the behavior of your reactive pipeline. The JVM might exit before doSomething is done, since nothing is waiting for its completion signal.
Because of that, and because newcomers tend to call subscribe instead of chaining operations with .then(), IntelliJ IDEA is treating that as a warning (and it was our advice to do so).

@bclozel Thanks. It is supposed to be a fire and forget. In reality the fire and forget does more than Mono.just with a string. The JVM is not suppose to exit, but it would be okay to to lose doSomething once in a while. This does not really work with then()does it?

Why does the warning only appear in the flatMap?

I don't know why it appears only in one case, IMO it should be both.
Even if your operation is fire and forget, it's probably doing IO - if you ever want to test specific use cases or propagate reactor contexts, etc - making it part of the pipeline is still a good idea I think.
Now depending on what your actual use case look like, I'm sure people here could advise better choices of operators. I've suggested then because it's the first one that came to mind.
in my case, I'm defining a router bean using the 'router' function
like this
@Bean
fun audit(
    repository: AuditEventRepository
) = router {
    POST("/audit") { request ->
        Mono.defer { request.bodyToMono<AuditEvent>().map { repository.save(it) } }
            .retryWhen(Retry.backoff(3, Duration.ofMinutes(1)))
            .subscribeOn(Schedulers.elastic())
            .subscribe(
                { logger.info("Saved audit entity: $it") },
                { logger.error("Error saving audit entity", it) }
            )
        ServerResponse.ok().build()
    }
}
mplain
@mplain
so, IDEA warns me that I'm Calling 'Subscribe' in non-blocking scope
but if I change it to then(), it warns me that the value is never used as a publisher
what is the right way to proceed here?
(i want to receive a request, asyncronously write it into the database, and immediately send a 200 OK response)
Violeta Georgieva
@violetagg
@mplain one thing I want to mention. When Reactor Netty starts receiving the incoming data (that’s for the server), Reactor Netty buffers it and Spring Framework decoders start to consume it. If you return the response immediately we will cleanup all buffers for the incoming data and even if it is chunked we will stop consuming the incoming chunks.
So basically I’m wondering whether the sample above is working as expected
mplain
@mplain
i need a minute to process and try to understand that...
you're saying that as soon as I send a server response, the body disappears, and I won't be able to process it?
so I need to receive the body first, then send the server response, then process the body and make an async call to the database?
if so, then the correct way to process this request would be... something like this?
@Bean
fun audit(
    repository: AuditEventRepository
) = router {
    POST("/audit") { request ->
        request.bodyToMono<AuditEvent>()
            .doOnNext {
                Mono.fromCallable { repository.save(it) }
                    .retryWhen(Retry.backoff(3, Duration.ofMinutes(1)))
                    .subscribeOn(Schedulers.elastic())
                    .subscribe(
                        { logger.info("Saved audit entity: $it") },
                        { logger.error("Error saving audit entity", it) }
                    )
            }
            .flatMap { ServerResponse.ok().build() }
    }
}
Violeta Georgieva
@violetagg
@mplain Well you have this from the RFC https://tools.ietf.org/html/rfc7230#section-6.3
A server MUST read
   the entire request message body or close the connection after sending
   its response, since otherwise the remaining data on a persistent
   connection would be misinterpreted as the next request.
mplain
@mplain
interesting, thank you
i'm still not sure what's the right way to do an async task inside a publisher though =_=
Violeta Georgieva
@violetagg
instead of request can’t you ask Spring Framework to inject the body?
Mmm you are using functional ...
mplain
@mplain
by inject you mean @RequestBody ?
Violeta Georgieva
@violetagg
yes
mplain
@mplain
i'd rather like to understand better how reactor works
i do not fully understand this:
@adrianhj88 Calling subscribe within a reactive operator does not give any guarantee about doSomething being finished before the operator is done. In this case it might not be a big deal because you're doing Mono.just. But even if you're aware of that limitation, it could behave in strange ways during tests: let's say a test checks the behavior of your reactive pipeline. The JVM might exit before doSomething is done, since nothing is waiting for its completion signal.
Because of that, and because newcomers tend to call subscribe instead of chaining operations with .then(), IntelliJ IDEA is treating that as a warning (and it was our advice to do so).
basically, let's pretend that I don't need the request body at all
@Bean
fun audit(
    repository: AuditEventRepository
) = router {
    POST("/audit") { request ->
        Mono.defer { Mono.just(repository.save("hello")) }
            .subscribeOn(Schedulers.elastic())
            .subscribe()
        ServerResponse.ok().build()
    }
}
this would still give the warning Calling 'Subscribe' in non-blocking scope
Brian Clozel
@bclozel
Assuming your repository implementation is blocking (and that would be why you're trying to offload that to another scheduler):
@Bean
fun audit(
    repository: AuditEventRepository
) = router {
    POST("/audit") { request ->
        Mono.fromCallable(() -> repository.save("hello"))
            .subscribeOn(Schedulers.elastic())
            .then(ServerResponse.ok().build());
    }
}
Brian Clozel
@bclozel
Now back to the Calling subscribe within a reactive operator... explanation, let's put it another way. When a request comes in, if you call subscribe() on something consuming the request body, and on the other hand if you return the response independently - it would be like, in the Servlet world, returning the response and yet kicking off work in a new thread that's not monitored/managed by anything. If your app shuts down before the work is done, everything is lost. If that job is operating on the request data directly and the memory associated with the request is "recycled" by the server, your job will faill on invalid data.
mplain
@mplain
@bclozel thank you!!
Oemer Genc
@oemergenc

Hi, currently i need some help with GroupedFlux. What i want is to group messages by id and then process a specific number of messages for each id, let's say 3 messages for each id.
I have the following code:

        Flux.fromIterable(messages)
                .groupBy(m -> m.key)
                .concatMap { it.collectList() }
                .flatMap { Flux.fromIterable(it.take(3)) }
                .subscribe { println(it.key) }

This works, however if the grouped fluxes are containing a lot of messages, the collectList() takes very long.
As i am only interested in 3 messages of each group this seems to be unnecessary but i cannot find a proper way to prevent this.
Something like ....concatMap { it.take(3).collectlist() } always results in the following error:

Although QueueSubscription extends Queue it is purely internal and only guarantees support for poll/clear/size/isEmpty. Instances shouldn't be used/exposed as Queue outside of Reactor operators.

Anyone an idea how this can be done?

Leonard Brünings
@leonard84

Hi, I have this code to calculate a regularly updating status as hot producer.

Flux.concat(initialAggregatedStatus(),
                Flux.interval(Duration.ZERO, properties.getUpdateInterval())
                        .onBackpressureDrop(intervalCount -> {
                            log.warn("Too many connections pending, dropped status update request {}", intervalCount);
                            droppedUpdateCounter.increment();
                        })
                        .concatMap(ignore -> updateStatus()))
                .subscribeOn(Schedulers.parallel())
                .cache(1)

what is the correct way to stop this? takeWhile(ignore -> running) or .takeUntilOther(killSwitch) with private DirectProcessor<String> killSwitch = DirectProcessor.create();

Flux.concat(initialAggregatedStatus(),
                Flux.interval(Duration.ZERO, properties.getUpdateInterval())
                        .onBackpressureDrop(intervalCount -> {
                            log.warn("Too many connections pending, dropped status update request {}", intervalCount);
                            droppedUpdateCounter.increment();
                        })
                        .concatMap(ignore -> updateStatus()))
                .takeWhile(ignore -> running)
                .subscribeOn(Schedulers.parallel())
                .cache(1);

Or something else?

2 replies
Jørgen Ringen
@JorgenRingen
Using reactor-kafka, is there any way to continue on deserialization errors (kind of like log-and-continue-exception-handler in kafka-streams)?
Currently getting r.k.r.i.DefaultKafkaReceiver$PollEvent: Unexpected exception and it makes the whole "pipeline" fail.
1 reply
Alwyn Schoeman
@alwyn

Hi, I'm looking for the correct pattern to do the following:

I need to do 3 or more operations against an external system. The inputs and outputs of each operation is different from one another AND subsequent operations depend on outputs of previous operations

E.g. A -> B -> C -> D

Each following step checks a response code of the previous step.
If not successful it should not continue with the rest of the processing, but return the response of the previous operation.
If successful, it should use data from the previous response and perform the next action.

I am trying to avoid the scenario where I have the following nesting (pseudocode):

doA.flatMap { r ->
           if (r.code != 0) return@flatMap Mono.just(r)
           doB.flatMap { r ->
               if (r.code != 0) return@flatMap Mono.just(r)
               doC.flatMap {r ->
                   if (r.code != 0) return @flatMap Mono.just(r)
etc, etc, etc, etc
1 reply
Alex Mrynsky
@amrynsky
looking for a "reactive" way to work with piped input/output streams to compress data using GZIPOutputStream. Any hint is appreciated
Tomas Krasny
@krasnyt
Hi, can someone explain, why the following code doesn't block when sending 11th item? According to the doc the EmitterProcessor (if it has no subscribers) buffers the incoming items up to a bufferSize (which I set to 10). The following onNext should block (until it's drained) but it doesn't. Instead it blocks on inserting 17th element. Why it successfully inserts 16 elements? Why 16? v3.3.10.RELEASE.
public static void main(String[] args) {
    EmitterProcessor<Integer> processor = EmitterProcessor.create(10);

    for (int i = 1; i <= 20; i++) {
        processor.onNext(i);
        System.out.println("sent: " + i);
    }

    processor.subscribe(System.out::println);
}