All generic questions around Reactor. For advanced questions you can also try #reactor-core and #reactor-netty
I am using spring cloud gateway and using it's Gateway filter to do some pre and post operations on all the requests. In the prefilter on some condition I want to make an HTTP request via Retrofit and return that response directly instead of going to the upstream service.
This is something I found of how the response could be changed before sending back to client. Can someone help me with providing some dummy string in the response via this API - https://github.com/spring-cloud/spring-cloud-gateway/blob/00a1f5b7712357c53d716538bca9006cdfc63112/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/WebClientWriteResponseFilter.java#L53
Flux<Pair<Int, VALUE>>
and it emits items such as (1, One), (1, Two), (2, Three), (1, Four), (2, Five)
. There are separate consumers for each Int
. I want all values with 1s and 2s to be processed in parallel, but items of each group must be processed sequentially. Shall I use .window
or .groupBy
? Carnality of Int
is reasonable, less than 300. But I'm concerned about buffering of items of each group, they can be pretty big.
Anyone familiar with Spock here? I'm taking a class on Reactive Spring, and I've been able to use Spock no problem so far, but now there's a test using StepVerifier.withVirtualTime()
and it passes no matter what. Here's the whole test:
def "With virtual time"() {
given:
VirtualTimeScheduler.getOrSet()
def longFlux = Flux.interval(Duration.ofSeconds(1))
.take(3)
.log()
expect:
StepVerifier.withVirtualTime({ longFlux.log() })
.expectSubscription()
.thenAwait(Duration.ofSeconds(3))
.expectNext(0L, 1L, 2L)
.verifyComplete()
}
I also don't see the logs from the flux. Any ideas?
[id: 329c6ffd, L:/0:0:0:0:0:0:0:1:64286 ! R:/0:0:0:0:0:0:0:1:64284]
Hello folks, I have a code done in imperative style, where I did some database checks on ClientInfo entity object with mac address and username. If it esists in DB, I am doing something, and if it doesn't I am trying to do something else and return the ClientInfoModel object, which is nothing but the DTO. My previous code is as follows.
public ClientInfoModel registerClient(ClientInfoModel clientInfoModel) {
Optional<ClientInfo> clientInfoOptional = clientInfoRepository
.findByMacAndUsername(clientInfoModel.getMac(), clientInfoModel.getUsername());
ClientInfo clientInfo;
if (clientInfoOptional.isEmpty()) {
clientInfo = new ClientInfo();
} else {
clientInfo = clientInfoOptional.get();
}
BeanUtils.copyProperties(clientInfoModel, clientInfo);
clientInfoRepository.save(clientInfo);
return clientInfoModel;
}
Now I want to change this code to reactive style. I have used R2DBC driver for MariaDB and used it's instance. Note that I have another DB call to save the object after this check. Can someone help me the appropriate way to write this code to reactive style?
public Mono<ClientInfoModel> registerClient(ClientInfoModel clientInfoModel) {
Mono<ClientInfo> clientInfoMono = template.selectOne(
query(
where("mac").is(clientInfoModel.getMac())
.and("username").is(clientInfoModel.getUsername())
), ClientInfo.class);
return clientInfoMono.zipWith(clientInfo -> {
if (clientInfo != null) {
BeanUtils.copyProperties(clientInfoModel, clientInfo);
clientInfoRepository.save(clientInfo);
} else {
ClientInfo clientInfo1 = new ClientInfo();
BeanUtils.copyProperties(clientInfoModel, clientInfo1);
clientInfoRepository.save(clientInfo1);
}
return Mono.just(clientInfoModel);
});
}
Note that it is giving me an error on calling BeanUtils.copyProperties(source, target), also I think the save methods was not called in proper reactive style.
Hi @mainul35,
public Mono<ClientInfoModel> registerClient(ClientInfoModel clientInfoModel) {
return this.template.selectOne(query(where("mac").is(clientInfoModel.getMac()).and("username").is(clientInfoModel.getUsername())), ClientInfo.class)
.flatMap(clientInfo -> this.clientInfoRepository.save(this.mapToEntity(clientInfo, clientInfoModel)))
.switchIfEmpty(this.clientInfoRepository.save(this.mapNewToEntity(clientInfoModel)));
}
In your mapping function, you mustn't lose the reference to your entity object. Try to use MapStruct to avoid using reflection with BeanUtils.copyProperties(clientInfoModel, clientInfo1);
@Mapper(componentModel = "spring")
public interface ClientInfoMapper {
Client toEntity(@MappingTarget final Client entity, final ClientInfoModel dto);
}
Hi @mainul35,
public Mono<ClientInfoModel> registerClient(ClientInfoModel clientInfoModel) { return this.template.selectOne(query(where("mac").is(clientInfoModel.getMac()).and("username").is(clientInfoModel.getUsername())), ClientInfo.class) .flatMap(clientInfo -> this.clientInfoRepository.save(this.mapToEntity(clientInfo, clientInfoModel))) .switchIfEmpty(this.clientInfoRepository.save(this.mapNewToEntity(clientInfoModel))); }
In your mapping function, you mustn't lose the reference to your entity object. Try to use MapStruct to avoid using reflection with
BeanUtils.copyProperties(clientInfoModel, clientInfo1);
@Mapper(componentModel = "spring") public interface ClientInfoMapper { Client toEntity(@MappingTarget final Client entity, final ClientInfoModel dto); }
Thank you so much ^_^
NoClassDefFoundError: reactor/util/retry/Retry
error, refer to reactor/reactor-netty#1207 this issue, I added <reactor-bom.version>Dysprosium-SR9</reactor-bom.version>
in my pom.xml, but it doesn't work for me? Did I miss anything? thanks!
Hello folks, I had the controller code as below, which I am trying to rewrite in reactive style. Can someone guide me for the best practice? My Point for this code is always giving the front end developer a common way of receiving the response. Please correct me if I am wrong.
@PostMapping("/register")
public ResponseEntity<?> registerClient(@RequestBody ClientInfoModel clientInfoModel) {
ResponseModel<ClientInfoModel> responseModel = new ResponseModel<>();
responseModel.setMessage("Successfully registered the client");
responseModel.setData(clientInfoService.registerClient(clientInfoModel));
return ResponseEntity.ok(responseModel);
}
Note that clientInfoService.registerClient(clientInfoModel)
returns a mono of ClientInfoModel.
TcpServer
that can handle a certain protocol X. X is a rather chatty protocol, and involves multiple calls-and-responses from a client within a single transaction. There is no set number of these (several of them are repeatable / skippable), but they are ordered and should share context (i.e., info sent in step 1 should be accessible in step 2). We have no control over X itself. How would one go about framing this in this library? I can think of the following for organization, but I haven't had much luck getting anything to work yet.TcpServer.create()
.host(HOST)
.port(PORT)
.doOnConnection(connection -> {
// Could add chain of handlers here, one for each step. Would this be reactive? Is there a way to share context?
})
.handle((in, out) -> {
// AFAIK we can only call in.receive() once here. Is there a good way to expand this to multiple steps?
})
.bindNow();
Hi all, I am new to Spring Webflux and reactor framework. I would like to know when a user calls my endpoint and the handler I have written is in async fashion, how can i return result back to user?
Here is handler code:
public Mono<ServerResponse> listAll(ServerRequest serverRequest) {
return ServerResponse.ok().build().doOnNext(r -> Mono.fromCallable(() -> {
// DAO is synch library included in API service
return userDAO.findAll();
}).log()
.subscribeOn(Schedulers.parallel())
.subscribe(System.out::println));
}
Routing file:
public RouterFunction<ServerResponse> route(UserHandler userHandler) {
return RouterFunctions
.route(GET("/api/v1.0/users").and(accept(MediaType.APPLICATION_JSON)), userHandler::listAll)
The client get immediate 200 OK, however, I would like send the result to client once I get result from DB in async fashion.. any suggestions?
@LoadBalanced
@Bean
WebClient.Builder webClientBuilder() {
return WebClient.builder();
}
I have some Reactor Kafka code that reads in events via a KafkaReceiver
and writes 1..many downstream messages via 1or more KafkaSenders
that are concatenated into a single Publisher
. Everything is working great, but what I'd like to do is only emit an event from this concatenated senders Flux
when it is complete (i.e. it's done writing to all downstream topics, so not emit anything for each element as it writes to Kafka downstream). This way I could sample()
and periodically commit offsets, knowing that whenever it is that sample()
happens to trigger and I commit offsets for an incoming event that I've processed all downstream messages for each event I'm committing offsets for. It seems like I could use either pauseUntilOther()
or then()
somehow, but I don't quite see exactly how. Anyone have any thoughts?
Main Publisher code:
this.kafkaReceiver.receive()
.groupBy(m -> m.receiverOffset().topicPartition())
.flatMap(partitionFlux ->
partitionFlux.publishOn(this.scheduler)
.flatMap(this::processEvent)
.sample(Duration.ofMillis(10))
.concatMap(sr -> commitReceiverOffset(sr.correlationMetadata())))
.subscribe();
Concatenated KafkaSenders returned by call to processEvent()
return Flux.concat(kafkaSenderFluxA, kafkaSenderFluxB)
.doOnComplete(LOG.info(“Finished writing all downstream messages for incoming event);