All generic questions around Reactor. For advanced questions you can also try #reactor-core and #reactor-netty
Context
values for all publisher chains. This is what I came up with, not sure if I'm terribly ruining everything by doing so:Hooks.onLastOperator("test-hook", publisher -> {
if (publisher instanceof Mono) {
return ((Mono<Object>) publisher).subscriberContext(SomeClass::initContext);
}
return ((Flux<Object>) publisher).subscriberContext(SomeClass::initContext);
});
Hello! I am using Kafka reactor. I consume batch of messages from Kafka and store it in DB asynchronously and the operation returns completableFutures.
This is what I have so far:
Flux<K> flux = receiver.receiveAtmostOnce()
flux
.bufferTimeout(maxBatchSize, batchTimeout)
.onBackpressureBuffer()
.publishOn(Schedulers.boundedElastic())
.doOnError(th -> {
LOG.warn("Exception while handling records", th);
latch.countDown();
})
.doOnCancel(() -> LOG.info("Kafka receiver stopped"))
.subcribe(records ->
completetableFutureArray = processRecords(records)
//Blocking call that I want to avoid//
completetableFutureArray.join()
)
I understand using a blocking call is bad but
Any help/suggestion would be appreciated
need help to read the field value from ServiceRequest object and this object hold only "Part.class" file content.
when i iterate the "partFile" object, it doesn't hold the Form-fields values.
public Mono<ServerResponse> getUploadFiles(ServerRequest request){
return request.multipartData()
.flux()
.flatMap(map -> Flux.fromIterable(map.values()))
.collectList()
.flatMap((partFile) -> ServerResponse.ok()
.body(Mono.just(fileStorageService.storeZipFile(partFile.get(0), "", "")), String.class))
.onErrorResume(
JsonProcessingException.class,
(e) -> ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.contentType(MediaType.APPLICATION_JSON)
.body(Mono.just(e.getMessage()), String.class));
}
Hello, I'm trying to understand how gRPC Client Cancellation works when using https://github.com/salesforce/reactive-grpc. I have an example program that creates a simple service, something like the following:
@Slf4j
@GrpcService
public class Service extends ReactorServiceGrpc.ServiceImplBase {
@Override
public Mono<User> getUser(Mono<GetUserRequest> requestMono) {
return requestMono
.doOnNext(__-> log.info("Got a request"))
.thenReturn(User.newBuilder().build())
.delayElement(Duration.ofSeconds(5))
.doOnTerminate(() -> log.info("Terminated"));
}
}
I expect that I will see get "Terminated" shortly after "Got a request!" if I cancel the call from the client. I, however, see the following output:
2020-10-25 16:17:09.633 [ault-executor-0] : Got request!
2020-10-25 16:17:14.634 [ parallel-2] : Terminated
2020-10-25 16:17:14.637 [ parallel-2] r.c.p.Operators : Operator called default onErrorDropped
io.grpc.StatusRuntimeException: CANCELLED: call already cancelled
Any ideas on why this happens and why I don't get the expected behavior?
Hi! Dear reactor team, I cannot understand why this test case does not pass?
when:
//def scheduler = VirtualTimeScheduler.getOrSet();
def rp = Sinks.many().replay().limit(Duration.ofSeconds(1))
def flux = rp.asFlux().doOnNext({
log.info it.toString()
})
for (int i = 0; i < 5; i++) {
rp.emitNext(i, Sinks.EmitFailureHandler.FAIL_FAST);
}
Thread.sleep(2000L)
// scheduler.advanceTimeBy(Duration.ofSeconds(2))
for (int i = 5; i < 10; i++) {
rp.emitNext(i, Sinks.EmitFailureHandler.FAIL_FAST);
}
then:
StepVerifier.create(flux)
.expectNext(5,6,7,8,9)
.thenCancel()
.verify()
This test case passes only if I uncomment virual scheduler.
Flux.sample(Duration)
, but which emits an element immediately if no previous element was received in the preceding sample window, and then resets the sample window from that point. As far as I can see, the Flux.sample(Duration)
operator will always wait until the current sample window expires before emitting, even if nothing was emitted in the previous window. Does something like this already exist? I suspect it might be possible to implement what I need with the Flux.sample(Publisher)
method but I'm not sure how to approach it, so any pointers here would be much appreciated. Thanks!
Hello!
I've discovered something strange
Server side:
GET("/flux") {
val flux = Flux.range(0, 5)
// .map(Int::toString)
.delayElements(Duration.ofSeconds(1))
ServerResponse.ok().body(flux)
}
Client side:
WebClient.create("http://localhost:8080")
.get()
.uri("/flux")
.exchange()
.doOnNext { logger.info(it.statusCode().toString()) }
.doOnNext { it.headers().asHttpHeaders().forEach { logger.info("${it.key}: ${it.value}") } }
.flatMapMany { it.bodyToFlux<String>() }
.subscribe(logger::info)
If the server returns a flux of integers,
then response content-type is set to application/json,
client waits for 5 seconds, then receives the response and body together
body is an array of integers
2020-11-01 23:44:00 - GET /flux
2020-11-01 23:44:06 - 200 OK
2020-11-01 23:44:06 - transfer-encoding: [chunked]
2020-11-01 23:44:06 - Content-Type: [application/json]
2020-11-01 23:44:06 - [0,1,2,3,4]
If the server returns a flux of strings,
then response content-type is set to text/plain,
client receives response immediately, then waits for 5 seconds, and receives the response body
body is a concatenated string
2020-11-01 23:45:07 - GET /flux
2020-11-01 23:45:08 - 200 OK
2020-11-01 23:45:08 - transfer-encoding: [chunked]
2020-11-01 23:45:08 - Content-Type: [text/plain;charset=UTF-8]
2020-11-01 23:45:12 - 01234
Hi there, i'm trying to implement a customQueue for a UnicastProcessor inside a Spring application, to handle bursts (i just want to drop the incoming message, if there is too many). My problem is, if i do
UnicastProcessor<WebSocketMessage> publisher = UnicastProcessor.create(myCustomqueue);
then my application totaly freezes overtime, and NOTHING works. there are no more logs, or anything like that. However if i create a UnicastProcessor with 0 parameter in the constructor, then everything is working fine.
Here's my custom queue implementation, it's really simple:
public class LimitedQueue<E> extends ArrayDeque<E> {
private final int limit;
public LimitedQueue(int limit) {
this.limit = limit;
}
@Override
public boolean offer(E o) {
if (size() >= limit) {
return false;
}
return super.offer(o);
}
}
I found a really similiar question from 8 months ago here as well:
https://stackoverflow.com/questions/60193240/reactor-unicastprocessor-drop-on-backpressure-when-queue-is-full
Could anyone help me out?
Hi everyone, I'm trying reactor Kafka (I'm basicaly a newbie at it), and made the samples work. However the consumer keeps spamming the following logs:
2020-11-04 10:01:36.007+0100 DEBUG 3856 --- [-sample-group-1] o.a.k.c.FetchSessionHandler : [Consumer clientId=sample-consumer, groupId=sample-group] Node 0 sent an incremental fetch response for session 569816377 with 0 response partition(s), 1 implied partition(s)
2020-11-04 10:01:36.008+0100 DEBUG 3856 --- [-sample-group-1] o.a.k.c.c.i.Fetcher : [Consumer clientId=sample-consumer, groupId=sample-group] Added READ_UNCOMMITTED fetch request for partition test2-0 at position FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=abbelfynsrv:9092 (id: 0 rack: null), epoch=0}} to node abbelfynsrv:9092 (id: 0 rack: null)
2020-11-04 10:01:36.008+0100 DEBUG 3856 --- [-sample-group-1] o.a.k.c.FetchSessionHandler : [Consumer clientId=sample-consumer, groupId=sample-group] Built incremental fetch (sessionId=569816377, epoch=35) for node 0. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
2020-11-04 10:01:36.008+0100 DEBUG 3856 --- [-sample-group-1] o.a.k.c.c.i.Fetcher : [Consumer clientId=sample-consumer, groupId=sample-group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(test2-0)) to broker abbelfynsrv:9092 (id: 0 rack: null)
Is there a way to disable this? I can show my consumer's code if needed.
Hi all,
I'm developing a Spring-Boot Reactor based application that we're currently performance testing. The basic data flow is:
Now, we're having issues. When the system is under load, our DB reports responding withing < 200ms is most cases, while a lot of our trace data shows that we only receive the first result from the DB in the code after ~1s. And I have no clue where that time difference is coming from.
Some things I have tried:
The only thing left for me to try is to throw out the r2dbc-postgresql repository and replace it with plain old jdbc and just schedule that on a boundedElastic scheduler?
Any idea is welcome...
I've got a library which parses a lot of XML data and where I have to provide some callback what should be done with the parsed data. Now I try to incorporate this lib in my reactive application like this
private fun InputStream.parse(): Flux<ParsedProduct> = Flux.create { sink ->
val requested = AtomicLong(0)
sink.onRequest {
if (logger.isTraceEnabled) {
logger.trace("Requested ParsedProducts", kv("number", it))
}
requested.addAndGet(it)
}
val parser = XmlArchiveParser(ProductParser()) {
if (logger.isTraceEnabled) {
logger.trace("Parsed product", kv("productId", it.productId))
}
while (requested.get() <= 0) {
logger.trace { "Waiting for requests" }
Thread.sleep(5)
}
sink.next(it)
requested.decrementAndGet()
}
parser.importAndProcessProducts(this)
sink.complete()
}
Is this a proper way to implement this hybrid push/pull backpressure mechanism? If so which Thread gets blocked currently by the Thread.sleep
? Should I subscribe this stuff on boundedElastic()
?
I'm using Spring WebClient (based afaik on reactor-netty-httpclient?)
I'm sending a stream or 5 values with a one second delay (Flux<Object>, content-type = application/x-ndjson) to the server, and receiving the response as a Flux as well
I expect to see it the logs something like this:
object #1 send
response #1 received
object #2 send
response #2 received
...
Objects are sent one-by-one, and responses are also received one-by-one
but WebClient starts receiving the response only after it finishes to send the request
I've found a similar question on SO: https://stackoverflow.com/questions/52113235
In it, @bclozel says that Spring WebClient and Reactor Netty HttpClient only start processing the response after they're finished processing the request
Also, @violetagg told me that if the Netty server sends a response before it it finished processing the request body, then the remaining request body is discarded (?), as per the HTTP spec (?)
Could you please confirm that this is the way WebFlux and Reactor Netty work by design?
Interestingly, another answer on SO says that Jetty-based WebClient works differently, it is able to start receiving the response before finishing to process the request
Is the restriction on the client side, or server side, or both?
@mplain
Also, @violetagg told me that if the Netty server sends a response before it it finished processing the request body, then the remaining request body is discarded (?), as per the HTTP spec (?)
And this is true if you finalise the response and not if you start writing the response
HttpServer server =
HttpServer.create()
.port(PORT)
.route(r -> r.post("/echo",
(req, res) -> res.header(CONTENT_TYPE, TEXT_PLAIN)
.send(req.receive().retain())));
then the remaining request body is discarded (?), as per the HTTP spec (?)
https://tools.ietf.org/html/rfc7230#section-6.3
A server MUST read
the entire request message body or close the connection after sending
its response, since otherwise the remaining data on a persistent
connection would be misinterpreted as the next request.
Lock
beforehand. After execution the Lock
has to be released. Now doFinally
sadly executes after the completion of the Publisher so there is no way (that I know of) that I can test this behavior. What would you suggest how I should handle this?
Hi, I have a handler method in which I need to persist data in DB and once persisted, I need to send out some events (http call reactive webclient) with the persisted data. I want to return the control back once the DB operation is completed. The sending of event parts need to happen as a background job. I am having issue assembling the Mono operator chain for such. Here is something what I have
Mono<ServerResponse> handleRequest(ServerRequest request) {
var persistedMono = Mono.just("Hello")
.flatMap(this::persistData)
.doOnError(this::sendErrorEvent);
persistedMono.publishOn(Schedulers.parallel())
.doOnSuccess(this::sendEvents).subscribe();
return persistedMono.flatMap(result -> ServerResponse.ok()
.body(BodyInserters.fromValue(result)));
}
The problem here is the subscription for persistMono
happens twice and persistData
is called twice.
How can I address this issue and send the events in the background.
Thanks in advance for any suggestions/solution
public Flux<TopicCommentResponse> topicComments(String language, long topicId) {
return databaseClient.execute("" +
"SELECT tc.*, u.username FROM topic_comment tc JOIN users u on tc.user_id = u.id WHERE tc.topic_id=:topicId AND tc.parent_id is null order by tc.created_at desc")
.bind("topicId", topicId)
.as(TopicCommentResponse.class)
.fetch()
.all()
.flatMap(comment ->
Mono.just(comment)
.zipWith(databaseClient.execute("" +
"SELECT tc.*, u.username FROM topic_comment tc JOIN users u on tc.user_id = u.id WHERE tc.topic_id=:topicId AND tc.parent_id=:parentId order by tc.created_at desc")
.bind("topicId", topicId)
.bind("parentId", comment.getId())
.as(TopicCommentResponse.class)
.fetch()
.all().collectList())
.map(tupla -> tupla.getT1().withAnswers(tupla.getT2()))
);
}
Hello people!
I am trying to implement reactive rabbit-mq in my spring webflux application, however I am facing issues. Following the tutorial from reactor rabbitmq reference guide, this is what I am trying to do:
sender.send(Mono.just(message))
.doOnSuccess(a-> System.out.println("Delivered"))
.doOnError(e -> {
logger.error("error", e);
})
.subscribe()
Please let us know what might be going wrong! Have not been able to find a solution over internet
PriorityQueue
to order them before emitting them to downstream subscribers. This seems like it could be achieved through a custom Processor
, but there seems to be a consensus discouraging that. If not via custom Processor
, then how?
Hi everyone. I am new to spring reactor and need some help implementing a simple use case. So i have two monos (mono1 and mono2) of same type Mono<List<Type1>> ....Type 1 is something like this
public class ErrorCountsByShortNameDto implements Serializable {
private String errorShortName;
private String deploymentName;
private Long errorCount;
}
Now i need to loop thee list in the 2 monos and match the Type1 items whose errorShortName is same. Can anyone help me point in the right direction or any older message here that does this.
Mono.fromFuture(cf)
How do I keep repeating this mono (something like .repeat()
) but stop it as soon as cf
isn't producing output. Hence you can consider cf
a reactive api for a stream, but since it does that once I don't know how to keep polling data and stop when stream is finished.