All generic questions around Reactor. For advanced questions you can also try #reactor-core and #reactor-netty
I don't know why it appears only in one case, IMO it should be both.
Even if your operation is fire and forget, it's probably doing IO - if you ever want to test specific use cases or propagate reactor contexts, etc - making it part of the pipeline is still a good idea I think.
Now depending on what your actual use case look like, I'm sure people here could advise better choices of operators. I've suggestedthen
because it's the first one that came to mind.
@Bean
fun audit(
repository: AuditEventRepository
) = router {
POST("/audit") { request ->
Mono.defer { request.bodyToMono<AuditEvent>().map { repository.save(it) } }
.retryWhen(Retry.backoff(3, Duration.ofMinutes(1)))
.subscribeOn(Schedulers.elastic())
.subscribe(
{ logger.info("Saved audit entity: $it") },
{ logger.error("Error saving audit entity", it) }
)
ServerResponse.ok().build()
}
}
Calling 'Subscribe' in non-blocking scope
@Bean
fun audit(
repository: AuditEventRepository
) = router {
POST("/audit") { request ->
request.bodyToMono<AuditEvent>()
.doOnNext {
Mono.fromCallable { repository.save(it) }
.retryWhen(Retry.backoff(3, Duration.ofMinutes(1)))
.subscribeOn(Schedulers.elastic())
.subscribe(
{ logger.info("Saved audit entity: $it") },
{ logger.error("Error saving audit entity", it) }
)
}
.flatMap { ServerResponse.ok().build() }
}
}
A server MUST read
the entire request message body or close the connection after sending
its response, since otherwise the remaining data on a persistent
connection would be misinterpreted as the next request.
@adrianhj88 Callingsubscribe
within a reactive operator does not give any guarantee aboutdoSomething
being finished before the operator is done. In this case it might not be a big deal because you're doingMono.just
. But even if you're aware of that limitation, it could behave in strange ways during tests: let's say a test checks the behavior of your reactive pipeline. The JVM might exit beforedoSomething
is done, since nothing is waiting for its completion signal.
Because of that, and because newcomers tend to call subscribe instead of chaining operations with.then()
, IntelliJ IDEA is treating that as a warning (and it was our advice to do so).
@Bean
fun audit(
repository: AuditEventRepository
) = router {
POST("/audit") { request ->
Mono.defer { Mono.just(repository.save("hello")) }
.subscribeOn(Schedulers.elastic())
.subscribe()
ServerResponse.ok().build()
}
}
Calling 'Subscribe' in non-blocking scope
@Bean
fun audit(
repository: AuditEventRepository
) = router {
POST("/audit") { request ->
Mono.fromCallable(() -> repository.save("hello"))
.subscribeOn(Schedulers.elastic())
.then(ServerResponse.ok().build());
}
}
Calling subscribe within a reactive operator...
explanation, let's put it another way. When a request comes in, if you call subscribe()
on something consuming the request body, and on the other hand if you return the response independently - it would be like, in the Servlet world, returning the response and yet kicking off work in a new thread that's not monitored/managed by anything. If your app shuts down before the work is done, everything is lost. If that job is operating on the request data directly and the memory associated with the request is "recycled" by the server, your job will faill on invalid data.
Hi, currently i need some help with GroupedFlux. What i want is to group messages by id and then process a specific number of messages for each id, let's say 3 messages for each id.
I have the following code:
Flux.fromIterable(messages)
.groupBy(m -> m.key)
.concatMap { it.collectList() }
.flatMap { Flux.fromIterable(it.take(3)) }
.subscribe { println(it.key) }
This works, however if the grouped fluxes are containing a lot of messages, the collectList()
takes very long.
As i am only interested in 3 messages of each group this seems to be unnecessary but i cannot find a proper way to prevent this.
Something like ....concatMap { it.take(3).collectlist() }
always results in the following error:
Although QueueSubscription extends Queue it is purely internal and only guarantees support for poll/clear/size/isEmpty. Instances shouldn't be used/exposed as Queue outside of Reactor operators.
Anyone an idea how this can be done?
Hi, I have this code to calculate a regularly updating status as hot producer.
Flux.concat(initialAggregatedStatus(),
Flux.interval(Duration.ZERO, properties.getUpdateInterval())
.onBackpressureDrop(intervalCount -> {
log.warn("Too many connections pending, dropped status update request {}", intervalCount);
droppedUpdateCounter.increment();
})
.concatMap(ignore -> updateStatus()))
.subscribeOn(Schedulers.parallel())
.cache(1)
what is the correct way to stop this? takeWhile(ignore -> running)
or .takeUntilOther(killSwitch)
with private DirectProcessor<String> killSwitch = DirectProcessor.create();
Flux.concat(initialAggregatedStatus(),
Flux.interval(Duration.ZERO, properties.getUpdateInterval())
.onBackpressureDrop(intervalCount -> {
log.warn("Too many connections pending, dropped status update request {}", intervalCount);
droppedUpdateCounter.increment();
})
.concatMap(ignore -> updateStatus()))
.takeWhile(ignore -> running)
.subscribeOn(Schedulers.parallel())
.cache(1);
Or something else?
Hi, I'm looking for the correct pattern to do the following:
I need to do 3 or more operations against an external system. The inputs and outputs of each operation is different from one another AND subsequent operations depend on outputs of previous operations
E.g. A -> B -> C -> D
Each following step checks a response code of the previous step.
If not successful it should not continue with the rest of the processing, but return the response of the previous operation.
If successful, it should use data from the previous response and perform the next action.
I am trying to avoid the scenario where I have the following nesting (pseudocode):
doA.flatMap { r ->
if (r.code != 0) return@flatMap Mono.just(r)
doB.flatMap { r ->
if (r.code != 0) return@flatMap Mono.just(r)
doC.flatMap {r ->
if (r.code != 0) return @flatMap Mono.just(r)
etc, etc, etc, etc
onNext
should block (until it's drained) but it doesn't. Instead it blocks on inserting 17th element. Why it successfully inserts 16 elements? Why 16? v3.3.10.RELEASE.public static void main(String[] args) {
EmitterProcessor<Integer> processor = EmitterProcessor.create(10);
for (int i = 1; i <= 20; i++) {
processor.onNext(i);
System.out.println("sent: " + i);
}
processor.subscribe(System.out::println);
}
Hi, I want help in understanding wether we can maintain trace/span id's in the entire pipeline.
For e.g.,
Flux.just(1,2,3)
.map(i -> i * 2)
.filter(i -> i > 3)
.subscribe(System.out::println);
I want to send signal all the way to the down stream kind of unique-ids (span/trace ids) for each element emitted.I am guessing it becomes tricky in case of flatMaps or schedulers?
I tried using Hooks and MutableContext?
I don't want to change the existing pipeline thats why I thought Hooks are best for my problem.Any idea is highly appreciated..
hello!
I need to take an object, send it to a server (using a WebClient), log the response, then do some work with it
Mono.just(something)
.doOnNext(this::sendAsyncAndLogResponse)
.doOnNext(this::doSomeWork)
.subscribe()
Problem is I don't see the server response in the logs
I assume I should not be calling WebClient.exchange() inside a .doOnNext()
What would be the correct way to do this?
Hi, I am looking recommendation on (groupBy & subscribe) vs (flux multicast and filter) to take different action on different event types on the flux. Could someone please help?
https://stackoverflow.com/questions/61814106/choice-between-groupby-and-filter-with-infinite-streams-in-reactor
I am starting with reactive development and I believe the approach is share the flux and filter them in each of the pipelines. I understand that the thread count would be lower, but would this not increase the CPU as computations are lot higher. (I would use a switch case in the regular programming model)
The incoming messages are of the type
"messageType": "lightconfig",
"state": "on"/"off",
"deviceId": "A0001"
}
I guess there would be like 15-20 categories eventually. So the common part is retrieving the device details. After that, I see 2 options,
Do the common part of the pipeline and share the observable. Then each subscriber can listen to this. Say there are 20 observers and 100 events. We would be running the filter computation 2000 times.
Do the common part of the pipeline, use group-by to group the observable and subscribe the grouped observable with an observer. the observer will receive the Map<messageType,Observable<Message>>.
Hi, I am using reactor.retry.Retry from io.projectreactor.addons:reactor-extra:3.3.4.RELEASE lib to include retries in my reactive chain.Everthing works as expected, but when I run againist blockhound I get a blocking exception as follows.Is there any recommended way to fix this, or is it a bug in the library itself.
at java.io.FileInputStream.readBytes(FileInputStream.java)
at java.io.FileInputStream.read(FileInputStream.java:255)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at java.io.DataInputStream.readFully(DataInputStream.java:195)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at java.time.zone.TzdbZoneRulesProvider.load(TzdbZoneRulesProvider.java:187)
at java.time.zone.TzdbZoneRulesProvider.<init>(TzdbZoneRulesProvider.java:113)
at java.time.zone.ZoneRulesProvider$1.run(ZoneRulesProvider.java:157)
at java.security.AccessController.doPrivileged(Native Method)
at java.time.zone.ZoneRulesProvider.<clinit>(ZoneRulesProvider.java:144)
at java.time.ZoneRegion.ofId(ZoneRegion.java:120)
at java.time.ZoneId.of(ZoneId.java:411)
at java.time.ZoneId.of(ZoneId.java:359)
at java.time.ZoneId.of(ZoneId.java:315)
at java.util.TimeZone.toZoneId(TimeZone.java:556)
at java.time.ZoneId.systemDefault(ZoneId.java:274)
at reactor.scheduler.clock.SchedulerClock.of(SchedulerClock.java:166)
at reactor.retry.AbstractRetry.<init>(AbstractRetry.java:66)
at reactor.retry.DefaultRetry.<init>(DefaultRetry.java:48)
at reactor.retry.DefaultRetry.create(DefaultRetry.java:58)
at reactor.retry.Retry.allBut(Retry.java:107)
Hi, i'm using the spring framework org.springframework.web.reactive.socket.WebSocketSession
, and i create a UnicastProcessor reactor.core.publisher.UnicastProcessor
to send messages. LIke this:
publisher = UnicastProcessor.create();
session.send(publisher).subscribe();
// [...] stuff
WebSocketMessage message = new WebSocketMessage(WebSocketMessage.Type.BINARY, dataBuffer);
publisher.onNext(message);
It is working perfectly, however i'm curious what happens, if i produce too many messages, and my network can't keep up with the sent messages.
I tried to search on google, but no luck, so any help is appreciated
for reactivegridfs: It seems to run out of memory. I am not able to apply backpressure...any hints pls
for (Map<String, Object> metadata : mdList){saveObject(dataBuffer, metadata)
.onErrorContinue((e, a)-> log.error(e.getMessage()))
.subscribe(new BackpressureReadySubscriber());}
public class BackpressureReadySubscriber<T> extends BaseSubscriber<T> {
int limit =100;
int factor = limit;
int delay = 5000;
int consumed;
@Override
protected void hookOnError(Throwable throwable) {
log.error(throwable.getMessage());
}
@Override
public void hookOnSubscribe(Subscription subscription) {
// log.info("Size of request"+ factor);
// initial request, but why it is called everytime
//further request data
request(factor);
}
@Override
@SneakyThrows
public void hookOnNext(T value) {
consumed++;
log.info("Consumed"+consumed);
if (consumed == limit) {
consumed = 0;
log.info("Sleeping for "+delay/1000+" sec");
Thread.sleep(delay);
log.info("woke up after "+delay/1000+" sec");
request(factor);
}
}
}
Context
values for all publisher chains. This is what I came up with, not sure if I'm terribly ruining everything by doing so:Hooks.onLastOperator("test-hook", publisher -> {
if (publisher instanceof Mono) {
return ((Mono<Object>) publisher).subscriberContext(SomeClass::initContext);
}
return ((Flux<Object>) publisher).subscriberContext(SomeClass::initContext);
});