All generic questions around Reactor. For advanced questions you can also try #reactor-core and #reactor-netty
I would like to share with you the Practical Reactor workshop that I have created. If you like a practical hands-on approach to learning things, or you want to improve the existing skills of Project Reactor this workshop is for you. It contains over 100+ unique exercises and it covers most of Project Reactor operators, semantics, etc...
https://github.com/schananas/practical-reactor
If you decide to try it out and notice any issues, have idea for interesting example/exercise or have feedback please share it, either here or by reporting the issue on Github.
Hello, I am new to project and reactor. here is my code its just a simple update operation and functionality working fine but I'm struggling to make this code more cleaner and readable. since I'm new to this reactor world can someone help how to make this more concise.
public Mono<ServerResponse> updateStudent(Mono<Student> student){
return student.flatMap(newStudent->{
if(newStudent.getPin()==null) {
Mono<Boolean> isExists = validateEmailExistsorNot(newStudent.getEmailId());
return isExists.flatMap(result->{
if(result==false) { //create record
return seqGenRepo.generateSequence().flatMap(seqId->{
newStudent.setPin(seqId);
return studentRepo.updateRecord(newStudent)
.flatMap(newRecord->
ServerResponse.status(HttpStatus.CREATED)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue("record has been created"))
);
});
}
else {
return ServerResponse.status(HttpStatus.BAD_REQUEST)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue("duplicate Email Id"));
}
});
}else{//update record
return studentRepo.updateRecord(newStudent)
.flatMap(updatedRecord->
ServerResponse.status(HttpStatus.CREATED)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue("record has been updated"))
);
}
});
}
We're getting data from a grpc service and I implemented a "messageListener" that listen to the grpc data stream and turning the data into a flux, the code looks like this:
public Flux<Map<String, Object>> getMetricsAsFlux(Metric.MetricGetQuery query) {
MessageListener<Metric.MetricGetReply> messageListener = new MessageListener<>();
iqApiStub.withCallCredentials(callCredentials).metricGet(query, messageListener);
return messageListener.getFlux()
.map(IqarosGRPCMapper::toObjectMap);
}
Then I realized I need the ability to retry the query on certain errors and I came up with this:
public Flux<Map<String, Object>> getMetricsAsFlux(Metric.MetricGetQuery query) {
MessageListener<Metric.MetricGetReply> messageListener = new MessageListener<>();
Flux<Map<String, Object>> flux = messageListener.getFlux()
.doOnSubscribe(sub -> {
iqApiStub.withCallCredentials(callCredentials).metricGet(query, messageListener);
})
.map(IqarosGRPCMapper::toObjectMap);
return flux;
}
I'm quite happy this is working, if I get an error we can do a retry and that will trigger a new subscription and a new query to the grpc service. However, I fear I'm abusing the API here. Can you suggest some other (and better) patterns?
Flux<UserArtistsDto> userArtistsFlux = fetchSimilarProfilesBasedOnProfiles.getRelatedProfilesByArtists(
"Travis Scott", "G-Eazy", "Hole", "blackbear", "no", "prathamkrishna"
);
List<UserProfileArtistsDto> list = new ArrayList<>();
Flux<UserProfileArtistsDto> mapFlux=
userArtistsFlux
.flatMapSequential(val->{
UserProfileArtistsDto userProfileArtistsDto = new UserProfileArtistsDto();
userProfileArtistsDto.setUsername(val.getUsername());
userProfileArtistsDto.setUserprofilelink(val.getUserprofilelink());
fetchSimilarProfilesBasedOnProfiles.getUserArtist(val.getUsername()).subscribe(artist->{
if (userProfileArtistsDto.getArtists() == null){
userProfileArtistsDto.setArtists(new ArrayList<>(){{
add(artist);
}});
} else {
userProfileArtistsDto.getArtists().add(artist);
}
});
list.add(userProfileArtistsDto);
return Flux.fromIterable(list);
});
return ServerResponse.ok()
.contentType(MediaType.TEXT_EVENT_STREAM)
.body(mapFlux, UserProfileArtistsDto.class);
subscribe()
but I know that's not recommended since it short-circuits back pressure. What about publishing the write operations to a shared sink and having the subscriber perform the cache write operations?
Hey, I'm trying to implement the back-pressure in my quiz system,
Requirement - Multiple quiz are running, So user which are subscribe a single quiz and giving the quiz, but when i publish the event is send all quiz which is running at that time, anybody can help me out
private final Sinks.Many<Response> sink;
publishing -> sink::tryEmitNext
@Bean
public Sinks.Many<Response> sink(){
return Sinks.many().replay().latest();
}
@Bean
public Flux<Response> flux(Sinks.Many<Response> sink){
return sink.asFlux();
}
public Mono<ServerResponse> foo(ServerRequest request) {
return request.bodyToMono(Test.class).flatMap(regVer -> {
return this.handlerV1Repo.findDuplicateByToken(regVer.getToken()).flatMap(exsist -> {
if (exsist) {
return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyValue(new DedubRes(
RES_STATUS.FAIL.getCode(), Arrays.asList(new ErrorModel("", "found duplicate"))));
} else {
return this.handlerV1Repo.saveRepo(regVer).flatMap(sData -> {
return ServerResponse.status(HttpStatus.CREATED).contentType(MediaType.APPLICATION_JSON)
.bodyValue(new DedubRes(RES_STATUS.SUCCESS.getCode(), Collections.emptyList()));
}).switchIfEmpty(ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyValue(new DedubRes(
RES_STATUS.FAIL.getCode(), Arrays.asList(new ErrorModel("", "unable to insert data")))));
}
});
});
}
how to restructure the code with method chaining.Hi everybody,
not sure if reactor is applicable in my case:
I have TCP server that needs to do the following:
TcpServer.create()
.handle((in, out) -> {
Flux<String> flux = Flux.create(sink -> {
for (int i = 0; i < 10; i++) {
sink.next("i = " + i + "\n");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
return out.sendString(flux).neverComplete();
})
.bindNow();
I suppose in place of flux
I can do/call any code even blocking one.Have a good evening and thanks in advance for your help!
I also have a question that I have several answers to. I'm just looking for advice on the best practice.
I'm doing some gRPC stuff and have methods likeMono<Response> convertSingle(Mono<Request> request);
andFlux<Response> convertMany(Flux<Request> requests);
Request is 1:1 with Response. They do the same thing so I want to not write the same code twice. What would be the preferred way to call one method from the other in order to convert?
Hello everyone,
For my graduation project at the university, I am evaluating the benefits of mutation testing to validate the effectiveness of StepVerifier tests for reactive programs with Project Reactor. Existing mutators of Java mutation testing tools do not suffice here, because we also need to validate if the streaming nature of reactive programs is tested well enough.
(Mutation testing is the method of validating software tests, where changes are made to the source code (mutants) to test if the test code is able to detect the changes. Part of the mutation testing tool that makes a specific change to the source code is called a mutator.)
For this reason I am trying to make an extension to an existing mutation testing tool, with new mutators. Therefore I need to find ways in which I can mutate reactive code that are specific to the reactive paradigm. At a later phase we are going to examine how much of these mutations are picked up by StepVerifier tests of open-source projects. A first mutator idea is to add a sleep call to every function, to check if the time constraint of reactive programs is tested.
Are there people, experienced with Project Reactor reactive programs and StepVerifier testing, here that have suggestions on other mutators that could be created to test the effectiveness of StepVerifier tests for reactive programs with Project Reactor?
This will help me find out how we can make the most effective method to validate StepVerifier tests, and hopefully contribute to improving reactive code quality.
Thank you in advance.
Process newPocess = Runtime.getRuntime().exec(process);
but it should in some way wait for the process to complete with Mono.just(newProcess.waitFor());
before it returns the Mono. It seems that this never happens even when the process complets successfully. This waitfor
returns an int
but I never get the int
back in the calling method. Any help would be appereciated.
We're using Reactor to queue and execute asynchronous tasks, which seems like a standard use case.
We often want to know if there are any outstanding tasks in flight. For example, is there a "save file" operation still in progress?
but I'm having a lot of trouble coming up with the right pattern for that
Hi Folks, I’m trying to get a count of the frequency words occur in a flux of strings. I note from this issue reactor/reactor-core#931 Simon suggests ‘That said, maybe groupBy is not the ideal tool for this job. Consider exploring using Flux.collect(Supplier, BiConsumer), where the container is e.g. a Map<String, Integer> and the BiConsumer increments the Integer when it encounters a word several times.’
Appreciate a steer on how to to implement this, thanks.
Cheers,
Jimmy.
Flux<String>
differently then subsequent without using an external var? I'm streaming a CSV with header where each line is mapped. I.e. something likecsvFlux.mapFirst(Utils::transformHeader) // applies function to header
.map(Utils::transformRow); // applies different function to each row
List<Mono<Response>> responses = new ArrayList<>();
Mono.just(requests)
.flatMapIterable(request -> request)
.flatMap(request -> {
// requestService contains a WebClient call to different rest endpoints
Mono<Response> response = requestService.handle(request)
.subscribeOn(Schedulers.boundedElastic())
.take(Duration.ofMillis(80000));
responses.add(response);
return response;
})
.elapsed()
.doOnNext(tupel -> {
// logic here to determine if certain criteria are met in tupel.getT2()
for (Mono<Response> response : responses) {
// based on criteria we can reduce duration or
// return immediately, either by setting duration to 0 or some other method
response.take(Duration.ofMillis(newDuration));
}
})
.map(tupel -> tupel.getT2());
Hello everyone!
I have a very insane reactive streams pipeline (it’s kind of batch processing) and I’m getting java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 200000ms in 'flatMap' (and no fallback has been configured)
The question is I do not know which part of the stream is responsible for throwing the exception:
2022-05-16 08:01:51.037 ERROR 1 --- [ parallel-3] c.g.b.s.pdm.NewImosOrganizerService : java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 200000ms in 'flatMap' (and no fallback has been configured)
bearcat_1 |
bearcat_1 | java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 200000ms in 'flatMap' (and no fallback has been configured)
bearcat_1 | at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:295)
bearcat_1 | at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:280)
bearcat_1 | at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:419)
bearcat_1 | at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
bearcat_1 | at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271)
bearcat_1 | at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286)
bearcat_1 | at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
bearcat_1 | at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
bearcat_1 | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
bearcat_1 | at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
bearcat_1 | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
bearcat_1 | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
bearcat_1 | at java.base/java.lang.Thread.run(Thread.java:833)
bearcat_1 |
bearcat_1 | 2022-05-16 08:01:51.042 ERROR 1 --- [ parallel-3] reactor.core.publisher.Operators : Operator called default onErrorDropped
bearcat_1 |
bearcat_1 | reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 200000ms in 'flatMap' (and no fallback has been configured)
bearcat_1 | Caused by: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 200000ms in 'flatMap' (and no fallback has been configured)
bearcat_1 | at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:295)
bearcat_1 | at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:280)
bearcat_1 | at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:419)
bearcat_1 | at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
bearcat_1 | at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271)
bearcat_1 | at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286)
bearcat_1 | at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
bearcat_1 | at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
bearcat_1 | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
bearcat_1 | at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
bearcat_1 | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
bearcat_1 | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
bearcat_1 | at java.base/java.lang.Thread.run(Thread.java:833)
bearcat_1 |
Can someone please help?