All generic questions around Reactor. For advanced questions you can also try #reactor-core and #reactor-netty
@mplain
Also, @violetagg told me that if the Netty server sends a response before it it finished processing the request body, then the remaining request body is discarded (?), as per the HTTP spec (?)
And this is true if you finalise the response and not if you start writing the response
HttpServer server =
HttpServer.create()
.port(PORT)
.route(r -> r.post("/echo",
(req, res) -> res.header(CONTENT_TYPE, TEXT_PLAIN)
.send(req.receive().retain())));
then the remaining request body is discarded (?), as per the HTTP spec (?)
https://tools.ietf.org/html/rfc7230#section-6.3
A server MUST read
the entire request message body or close the connection after sending
its response, since otherwise the remaining data on a persistent
connection would be misinterpreted as the next request.
Lock
beforehand. After execution the Lock
has to be released. Now doFinally
sadly executes after the completion of the Publisher so there is no way (that I know of) that I can test this behavior. What would you suggest how I should handle this?
Hi, I have a handler method in which I need to persist data in DB and once persisted, I need to send out some events (http call reactive webclient) with the persisted data. I want to return the control back once the DB operation is completed. The sending of event parts need to happen as a background job. I am having issue assembling the Mono operator chain for such. Here is something what I have
Mono<ServerResponse> handleRequest(ServerRequest request) {
var persistedMono = Mono.just("Hello")
.flatMap(this::persistData)
.doOnError(this::sendErrorEvent);
persistedMono.publishOn(Schedulers.parallel())
.doOnSuccess(this::sendEvents).subscribe();
return persistedMono.flatMap(result -> ServerResponse.ok()
.body(BodyInserters.fromValue(result)));
}
The problem here is the subscription for persistMono
happens twice and persistData
is called twice.
How can I address this issue and send the events in the background.
Thanks in advance for any suggestions/solution
public Flux<TopicCommentResponse> topicComments(String language, long topicId) {
return databaseClient.execute("" +
"SELECT tc.*, u.username FROM topic_comment tc JOIN users u on tc.user_id = u.id WHERE tc.topic_id=:topicId AND tc.parent_id is null order by tc.created_at desc")
.bind("topicId", topicId)
.as(TopicCommentResponse.class)
.fetch()
.all()
.flatMap(comment ->
Mono.just(comment)
.zipWith(databaseClient.execute("" +
"SELECT tc.*, u.username FROM topic_comment tc JOIN users u on tc.user_id = u.id WHERE tc.topic_id=:topicId AND tc.parent_id=:parentId order by tc.created_at desc")
.bind("topicId", topicId)
.bind("parentId", comment.getId())
.as(TopicCommentResponse.class)
.fetch()
.all().collectList())
.map(tupla -> tupla.getT1().withAnswers(tupla.getT2()))
);
}
Hello people!
I am trying to implement reactive rabbit-mq in my spring webflux application, however I am facing issues. Following the tutorial from reactor rabbitmq reference guide, this is what I am trying to do:
sender.send(Mono.just(message))
.doOnSuccess(a-> System.out.println("Delivered"))
.doOnError(e -> {
logger.error("error", e);
})
.subscribe()
Please let us know what might be going wrong! Have not been able to find a solution over internet
PriorityQueue
to order them before emitting them to downstream subscribers. This seems like it could be achieved through a custom Processor
, but there seems to be a consensus discouraging that. If not via custom Processor
, then how?
Hi everyone. I am new to spring reactor and need some help implementing a simple use case. So i have two monos (mono1 and mono2) of same type Mono<List<Type1>> ....Type 1 is something like this
public class ErrorCountsByShortNameDto implements Serializable {
private String errorShortName;
private String deploymentName;
private Long errorCount;
}
Now i need to loop thee list in the 2 monos and match the Type1 items whose errorShortName is same. Can anyone help me point in the right direction or any older message here that does this.
Mono.fromFuture(cf)
How do I keep repeating this mono (something like .repeat()
) but stop it as soon as cf
isn't producing output. Hence you can consider cf
a reactive api for a stream, but since it does that once I don't know how to keep polling data and stop when stream is finished.
Blockhound.install()
throws a blocking call error for webClient.<...>.awaitBody()
Blockhound.builder().with(CoroutinesBlockHoundIntegration()).install()
does not throwHello All. Im trying to use the Sinks API to pass signals to consumers in certain way but looks like I'm doing something incorrectly. Could somebody help/take a look?
What I would like to do is:
sink = Sinks.many().replay().limit(EVENT_AGE_LIMIT);
and then expose it as:
sink.asFlux()
I wrote a simple test that ensures that a new subscriber still receives event that is less that EVENT_AGE_LIMIT old (lets say 2000ms):
// arrange
CountDownLatch received = new CountDownLatch(1);
Sinks.Many<Object> sink = Sinks.many().replay().limit(Duration.ofMillis(2000));
// act
Sinks.EmitResult emittedResult = sink.tryEmitNext("event A");
Thread.sleep(1800);
sink.asFlux().publishOn(Schedulers.boundedElastic()).subscribe(e -> received.countDown());
// verify
assertThat(emittedResult.isSuccess()).isTrue();
assertThat(received.await(200, TimeUnit.MILLISECONDS)).isTrue();
The test fails at the assertion in the last line. However, if I change the Sink to: Sinks.Many<Object> sink = Sinks.many().replay().all();
then it passes.
Does anybody know what I could be doing wrong?
doFirst
to get a lock and doFinally
to release the lock. Is this the right pattern or there is a more idiomatic way to do this?
@RahulKushwaha what a strange coincidence, I'm doing exactly the same right now and was about to ask if it's alright or not
except that I'm not using doFirst
because doFirst
runs even before subscription which as far as I understood this would be right away, and that means your code would lock even before it's subscribed to, and I doubt you would want that.
Here's my code
@Override
public <V> Mono<V> transaction(Id id, Callable<V> action) {
return Mono.fromCallable(action).doOnSubscribe(s -> {
locks.putIfAbsent(id, new ReentrantLock());
locks.get(id).lock();
}).doFinally(f -> locks.get(id).unlock());
}
Hope one of the experts here would let us know if this is abuse of reactor or a totally fine thing.
P.S I'm locking on a certain id through a map.
Hello everybody, why this code doesn't work as expected?
@Test
@SneakyThrows
void simple() {
final var emit = Sinks.many().multicast().onBackpressureBuffer();
Flux.interval(Duration.ofSeconds(5), Schedulers.newSingle("special-timer")).log()
.filterWhen(ll -> emit.asFlux().next().hasElement()).log()
.subscribeOn(Schedulers.newSingle("under-test")).subscribe(ll -> System.out.println(ll));
for (int i = 0; i <= 100; i += 1) {
emit.tryEmitNext(0L);
TimeUnit.SECONDS.sleep(10);
}
}
I thought that each subscriber in filterWhen
statement will wait for one element, but actually only the first subscription ends. All others still wait for one element...
I need some help understanding the threading behaviour of Mono.fromCallable(..). I have this code:
public static void main(String[] args) throws InterruptedException {
Flux.range(1, 100_000)
.doOnNext(a -> System.out.println(a + ", thread: " + Thread.currentThread().getName()))
.flatMap(a -> Mono.fromCallable(() -> blockingMethod(a)).subscribeOn(Schedulers.elastic()))
.subscribe();
System.out.println("Here");
Thread.sleep(Integer.MAX_VALUE);
}
private static int blockingMethod(int s) {
try {
Thread.sleep(100_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s;
}
What's happening is:
The first 256 elements are printed on main (in doOnNext) as expected.
After around 1 second, the next 256, then the next and so on. Elements from the second batch onwards are printed on elastic threads.
I would expect all the elements to be printed on the main thread. Why do I observe this behaviour? I have also asked this question here on Stack Overflow.
TIA
SslContextBuilder.forClient().keyManager(clientCrtFile, clientKeyFile, clientKeyPassword)
private static void fluxTest2(List<Supplier<Map<String, Object>>> maps) {
Map<String, Object> reduce = Flux.fromIterable(maps)
.buffer(5)
.parallel(maps.size() / 5)
.runOn(scheduler)
.map(entry -> {
long start = System.currentTimeMillis();
Map<String, Object> resMap = Maps.newHashMapWithExpectedSize(entry.size());
for (Supplier<Map<String, Object>> supplier : entry) {
Map<String, Object> realData = supplier.get();
resMap.putAll(realData);
}
System.out.println("step time: " + (System.currentTimeMillis() - start));
return resMap;
})
.reduce((m1, m2) -> {
m1.putAll(m2);
return m1;
})
.block();
System.out.println(reduce);
Map<String, Object> map = reduce;
Integer t = map.values().stream()
.map(v -> Integer.parseInt(String.valueOf(v)))
.reduce(Integer::sum)
.get();
System.out.println("size: " + map.size() + "\ttime: " + t);
}
@GetMapping(produces = "application/octet-stream")
public Mono<ResponseEntity<FileSystemResource>> getZipeFile(
ServerHttpRequest request
) {
try {
FileSystemResource zipFile = new FileSystemResource("result.zip");
ZipOutputStream out = new ZipOutputStream(new FileOutputStream(zipFile.getFile()));
out.putNextEntry(new ZipEntry("abc2.txt"));
out.write("abc123".getBytes());
out.flush();
out.closeEntry();
ResponseEntity<FileSystemResource> response = ResponseEntity
.ok()
.cacheControl(CacheControl.noCache())
.contentType(MediaType.APPLICATION_OCTET_STREAM)
.header("Content-Disposition", "attachment;filename=result.zip")
.body(zipFile);
return Mono.just(response);
} catch (Exception e) {
e.printStackTrace();
}
// just returning something so it compiles
return Mono.just(
ResponseEntity
.status(400)
.body(null));
}
Can anyone suggest a way to lazily subscribe to a Flux being used to source a unicast sink? I'd like to defer subscribing until after the Flux view of the sink (returned by the method) has been subscribed to:
private Flux<Integer> getSinkFlux(Flux<Integer> srcFlux) {
var dstSink = Sinks.many().unicast().<Integer>onBackpressureBuffer();
var dstFlux = dstSink.asFlux()
.log("reactor.Flux.DESTINATION", Level.WARNING);
// todo: defer the following until subscriber on dstFlux
srcFlux.subscribe(
srcVal -> {
dstSink.emitNext(srcVal, Sinks.EmitFailureHandler.FAIL_FAST);
}
);
return dstFlux;
}
I get a warning in IntelliJ regarding the srcFlux.subscribe
in this context, which also makes me suspect I'm doing something incorrect. Thanks!
TestPublisher
https://github.com/reactor/reactor-core/blob/fa8cbbd1dbf634e414756ab5efb65ac2ae748ed0/reactor-test/src/test/java/reactor/test/publisher/DefaultTestPublisherTests.java#L45
Hi all,
I am trying to read messages from webflux server but I can't.
This is my code:
@Test
public void testFindPersonsJson() throws Throwable {
final WebClient client = WebClient
.builder()
.baseUrl("ws://baseUrl_blabla:8080")
.defaultHeaders(httpHeaders -> {
httpHeaders.set("Sec-WebSocket-Key", "xqBt3ImNzJbYqRINxEFlkg==");
httpHeaders.set("Sec-WebSocket-Protocol", "chat, superchat");
httpHeaders.set("Sec-WebSocket-Version", "13");
httpHeaders.set("Sec-WebSocket-Extensions", "permessage-deflate");
httpHeaders.setUpgrade("websocket");
httpHeaders.setConnection("Upgrade");
httpHeaders.setContentType(MediaType.APPLICATION_STREAM_JSON);
}).build();
final Waiter waiter = new Waiter();
Flux<String> persons = client.
get()
.uri("/ws/persons?authorization=fsdadf_4235fefds-34fdfd")
.retrieve()
.bodyToFlux(String.class);
persons.subscribe(person -> {
waiter.assertNotNull(person);
System.out.println("_____________Client subscribes: {}" + person);
waiter.resume();
});
waiter.await(10000, 9);
}
I don't know how to get the response messages. It shoud look like that:
{"messageId":"43567gdjjsdf","messageType":"ABC","messagePayload":{"registered":true,"blocked":false}}
I can see my message on "16:11:04.304 [reactor-http-nio-2] DEBUG reactor.netty.channel.ChannelOperationsHandler - [___ id, L, R] No ChannelOperation attached. Dropping: +-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 81 7e 00 99 7b 22 6d 65 73 73 61 67 65 49 64 22 |.~..{"messageId"|
|00000010| 3a 22 33 33 37 31 36 64 61 35 62 61 62 65 61 39 |:"33716da5babea9|" but I can get it to hendle.
...................
but I can get it to hendle.
Seems that reactor.netty.channel.ChannelOperationsHandler logs my message but how can I get them?
Can someone please help my with that?
Thank you.
https://gitter.im/reactor/reactor?at=5d8d04c4290b8c354addc278
I was reading through the Flight of the Flux 3 blog post, and I saw that the recommended strategy for wrapping a blocking call is:
final Flux<String> betterFetchUrls(List<String> urls) {
return Flux.fromIterable(urls)
.flatMap(url ->
//wrap the blocking call in a Mono
Mono.fromCallable(() -> blockingWebClient.get(url))
//ensure that Mono is subscribed in an boundedElastic Worker
.subscribeOn(Schedulers.boundedElastic())
); //each individual URL fetch runs in its own thread!
}
But in the same article they used publishOn
instead:
Flux.fromIterable(firstListOfUrls) //contains A, B and C
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
Based on that, wouldn't it be simpler to implement the betterFetchUrls
method using publishOn
, i.e.,
final Flux<String> betterFetchUrls(List<String> urls) {
return Flux.fromIterable(urls)
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url));
}
Am I missing something? The Reactor user manual in Appendix C also uses subscribeOn
with a Mono
, so I presume there's some advantage I'm missing.
Any insight would be appreciated.