Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Stefan Dragisic
@91stefan__twitter

I would like to share with you the Practical Reactor workshop that I have created. If you like a practical hands-on approach to learning things, or you want to improve the existing skills of Project Reactor this workshop is for you. It contains over 100+ unique exercises and it covers most of Project Reactor operators, semantics, etc...

https://github.com/schananas/practical-reactor

If you decide to try it out and notice any issues, have idea for interesting example/exercise or have feedback please share it, either here or by reporting the issue on Github.

3 replies
marios sofocleous
@sofocleous2_twitter
Hi everyone, do u know if can deploy SCDF on aws ECS if can take advantage of autoscaling and resiliency?
aconst-null
@aconst-null
Hi there. Wondering if anyone can help me with a reactor 'idiomatic' way to solve a problem I've got. Lets say I've got a flux of X. Depending on a property of X, I might need to go do some more work for it. E.g. fetch something for this particular X value which helps me transform it. Lets call this operation Y. This is fine - I can just flatMap or whatever - when i see an X of interest, I do operation Y, otherwise let X pass thru as is. So far so good. However: I only want a max of N operation Ys to be in flight at one time (lets say they are massively memory intensive or whatever). The question is: How to cleanly limit the number of Y operations? The tricky thing (to me) is that not every X requires the operation. Only, say, 1 in every few thousand X's. I'm not quite sure how to 'limit' a subset of these operations: I can't simply apply it to the entire flux.....
4 replies
Lukas Vondracek
@_gondri_twitter
3 replies
kaushik-gopu1998
@kaushik-gopu1998

Hello, I am new to project and reactor. here is my code its just a simple update operation and functionality working fine but I'm struggling to make this code more cleaner and readable. since I'm new to this reactor world can someone help how to make this more concise.

public Mono<ServerResponse> updateStudent(Mono<Student> student){
return student.flatMap(newStudent->{
if(newStudent.getPin()==null) {
Mono<Boolean> isExists = validateEmailExistsorNot(newStudent.getEmailId());
return isExists.flatMap(result->{
if(result==false) { //create record
return seqGenRepo.generateSequence().flatMap(seqId->{
newStudent.setPin(seqId);
return studentRepo.updateRecord(newStudent)
.flatMap(newRecord->
ServerResponse.status(HttpStatus.CREATED)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue("record has been created"))
);
});
}
else {
return ServerResponse.status(HttpStatus.BAD_REQUEST)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue("duplicate Email Id"));
}
});

                  }else{//update record
                      return studentRepo.updateRecord(newStudent)
                              .flatMap(updatedRecord->
                                 ServerResponse.status(HttpStatus.CREATED)
                                 .contentType(MediaType.APPLICATION_JSON)
                                 .body(BodyInserters.fromValue("record has been updated"))
                              );
                  }
         });
}
Daniel Wahlqvist
@offroff

We're getting data from a grpc service and I implemented a "messageListener" that listen to the grpc data stream and turning the data into a flux, the code looks like this:

    public Flux<Map<String, Object>> getMetricsAsFlux(Metric.MetricGetQuery query) {
        MessageListener<Metric.MetricGetReply> messageListener = new MessageListener<>();
        iqApiStub.withCallCredentials(callCredentials).metricGet(query, messageListener);
        return messageListener.getFlux()
                .map(IqarosGRPCMapper::toObjectMap);
    }

Then I realized I need the ability to retry the query on certain errors and I came up with this:

    public Flux<Map<String, Object>> getMetricsAsFlux(Metric.MetricGetQuery query) {
        MessageListener<Metric.MetricGetReply> messageListener = new MessageListener<>();
        Flux<Map<String, Object>> flux = messageListener.getFlux()
                .doOnSubscribe(sub -> {
                    iqApiStub.withCallCredentials(callCredentials).metricGet(query, messageListener);
                })
                .map(IqarosGRPCMapper::toObjectMap);
        return flux;
    }

I'm quite happy this is working, if I get an error we can do a retry and that will trigger a new subscription and a new query to the grpc service. However, I fear I'm abusing the API here. Can you suggest some other (and better) patterns?

1 reply
Lukas Vondracek
@_gondri_twitter
Hi guys, any best practice how to do ManyToMany using r2dbc? I found that mixing JPA and R2DBC is not good practice, but how to work with example where user can have multiple roles and role can be assigned to multiple users? I am using Kotlin, Spring Boot (also experimenting with JPA added even if its not recommended).
1 reply
Michael Nitschinger
@daschl
Is there a best practice on how to convert a Flux into an InputStream with proper backpressure semantics?
Pratham Krishna
@Prathamkrishna_gitlab
Hey i've been stuck up at this one point and i can't seem to understand what the solution can be
Flux<UserArtistsDto> userArtistsFlux = fetchSimilarProfilesBasedOnProfiles.getRelatedProfilesByArtists(
            "Travis Scott", "G-Eazy", "Hole", "blackbear", "no", "prathamkrishna"
        );
        List<UserProfileArtistsDto> list = new ArrayList<>();
        Flux<UserProfileArtistsDto> mapFlux=
                userArtistsFlux
                .flatMapSequential(val->{
                        UserProfileArtistsDto userProfileArtistsDto = new UserProfileArtistsDto();
                        userProfileArtistsDto.setUsername(val.getUsername());
                        userProfileArtistsDto.setUserprofilelink(val.getUserprofilelink());
                        fetchSimilarProfilesBasedOnProfiles.getUserArtist(val.getUsername()).subscribe(artist->{
                            if (userProfileArtistsDto.getArtists() == null){
                                userProfileArtistsDto.setArtists(new ArrayList<>(){{
                                    add(artist);
                                }});
                            } else {
                                userProfileArtistsDto.getArtists().add(artist);
                            }
                        });
                        list.add(userProfileArtistsDto);
                        return Flux.fromIterable(list);
                });
        return ServerResponse.ok()
                .contentType(MediaType.TEXT_EVENT_STREAM)
                .body(mapFlux, UserProfileArtistsDto.class);
2 replies
What the code has to do: Fetch Usernames based on artists, and per fetch run a second query and fetch other details of the user and publish those details
What the code is doing: Fetching Usernames based on artists, returning those details, while the second query is being executed. So the second query results are not being passed down by the time the response is sent
what can be a possible solution for this?
HaloFour
@HaloFour
Is there a preferred method for fire&forget scenarios? For example, I want to write some data to a cache during a request but I don't want the request to be delayed waiting on that operation to complete. I'm currently using subscribe() but I know that's not recommended since it short-circuits back pressure. What about publishing the write operations to a shared sink and having the subscriber perform the cache write operations?
Pratham Krishna
@Prathamkrishna_gitlab
you could use CompletableFuture.runAsync anf it’ll run on a seperate background thread
HaloFour
@HaloFour
The cache is reactive and returns a Mono<T>, but I don't want to compose it into the reactive chain as that will delay completion until the cache operation completes. That's why I'm currently calling subscribe() on that Mono<T>.
rs2152
@rs2152

Hey, I'm trying to implement the back-pressure in my quiz system,
Requirement - Multiple quiz are running, So user which are subscribe a single quiz and giving the quiz, but when i publish the event is send all quiz which is running at that time, anybody can help me out

private final Sinks.Many<Response> sink;
publishing -> sink::tryEmitNext

@Bean
public Sinks.Many<Response> sink(){
    return Sinks.many().replay().latest();
}

@Bean
public Flux<Response> flux(Sinks.Many<Response> sink){
    return sink.asFlux();
}
1 reply
Volkan Yazıcı
@vy
What is the effective difference between .flatMap(body, concurrency) and .parallel(concurrency).concatMap(body) when both backed by a scheduler containing concurrency threads?
2 replies
HaloFour
@HaloFour
Thoughts on exposing more of an API for interpreting tracebacks? I'm trying to work it into a shortened stack trace for the purposes of logging but it's opaque and pretty raw. Ideally I'd like to be able to shorten class names, limit the number of traceback frames, etc., without having to interpret the text.
Omid Dehghan
@odchan1_twitter
Hey everyone :)
I have a website and published in there 1500 SEO optimized articles related to teaching the core of a few programming languages. Now I decided to sell the site. Please send me a DM if you're interested in.
P.S: I know the request is not specifically relative to the community, but I thought it might be a public interest.
Thank you
surendrarathore
@surendrarathore
public Mono<ServerResponse> foo(ServerRequest request) {

    return request.bodyToMono(Test.class).flatMap(regVer -> {
        return this.handlerV1Repo.findDuplicateByToken(regVer.getToken()).flatMap(exsist -> {
            if (exsist) {
                return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyValue(new DedubRes(
                        RES_STATUS.FAIL.getCode(), Arrays.asList(new ErrorModel("", "found duplicate"))));

            } else {
                return this.handlerV1Repo.saveRepo(regVer).flatMap(sData -> {
                    return ServerResponse.status(HttpStatus.CREATED).contentType(MediaType.APPLICATION_JSON)
                            .bodyValue(new DedubRes(RES_STATUS.SUCCESS.getCode(), Collections.emptyList()));

                }).switchIfEmpty(ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyValue(new DedubRes(
                        RES_STATUS.FAIL.getCode(), Arrays.asList(new ErrorModel("", "unable to insert data")))));

            }

        });
    });

}
how to restructure the code with method chaining.
I am not understand first flatmap parm value to another flatMap parm value.
1 reply
rs2152
@rs2152
Hi, How we can cancel the back-pressure subscription in webflux for single user?
theNikki1
@Nikki1The_twitter
Reactor noob here. Im working in legacy spring jpa project where we are also using Reactor core. Now my coworkers are suggesting approach where restcontroller is invoking service. Service just basically calls jpaRepository, transforms list response to Stream and then maps each item in the Stream to Mono(another db call, where we use subscribeOn as the docs suggest). Finally we got Stream<Mono<T>> which is fed to Flux.fromStream which is then flattened, finally returning Flux<T>. To my understanding this approach is incorrect because the first db call is then blocking (ran in the same Thread as restcontroller)..so basically
return Flux.fromStream(jpaRepo.getList(id).toStream().map(this::toMono)).flatMap(Function.identity())
Yaroslav Matveychuk
@yaroslavm

Hi everybody,
not sure if reactor is applicable in my case:
I have TCP server that needs to do the following:

  • allow clients to connect
  • should keep client's connection open
  • clients send registration message
  • tcp server should feed clients with some data periodically (once it ready by internal logic)
  • disconnect clients after some period of time
    TcpServer.create()
      .handle((in, out) -> {
          Flux<String> flux = Flux.create(sink -> {
              for (int i = 0; i < 10; i++) {
                  sink.next("i = " + i + "\n");
                  try {
                      Thread.sleep(1000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
          });
          return out.sendString(flux).neverComplete();
      })
      .bindNow();
    I suppose in place of flux I can do/call any code even blocking one.
    Is it the right direction to go?
    Also is it ok to save sink of Flux.generate somewhere and call from another thread?

Have a good evening and thanks in advance for your help!

8 replies
anastasiya-kaluzhonak
@anastasiya-kaluzhonak
Hi here!
As I see WorkQueueProcessor is deprecated in Reactor 3.4.* and will be removed in 3.5.0. Based on the documentation it should be somehow replaced with Sinks.
Is it possible with Sinks to distribute the signals to only one of the subscribed subscribers and to share the demand amongst all subscribers (as it is done in WorkQueueProcessor)?
3 replies
Sergey Kurenchuk
@kurenchuksergey
Hi here!
I am trying to understand how find hot threads which is using cpu and improve performance.
In my case the threads from http-event-pool always "run away" to making some business logic when I just handle results from webClient. Is it correct behaviour?
Should I always reschedule the result on my own business-logic scheduler ?
Do you the same in your practice?
And how you find some hot-spinlocks in your code without separated threads
Thanks 😊
Colt
@coltstrgj:matrix.org
[m]
I'm not super familiar with WorkQueueProcessor so I'm not 100% sure that would work but from what I'm seeing at a quick glance it should.

I also have a question that I have several answers to. I'm just looking for advice on the best practice.

I'm doing some gRPC stuff and have methods like
Mono<Response> convertSingle(Mono<Request> request);
and
Flux<Response> convertMany(Flux<Request> requests);

Request is 1:1 with Response. They do the same thing so I want to not write the same code twice. What would be the preferred way to call one method from the other in order to convert?

Juan Marín
@juanpmarin
Hi! Is there a way to synchronize two independent flows? I one to guarantee that only one can access a resource at the same time, similar to what a mutex or synchronized keyword do but in a non-blocking way
5 replies
code-URI
@code-uri

Hello everyone,
Can someone help me understand what does this mean EmitResult.FAIL_CANCELLED and when is this emitted.

Documentation says this

        /**
         * Has failed to emit the signal because the sink was previously interrupted by its consumer
         */
        FAIL_CANCELLED,
Nick Moone
@nickmoone

Hello everyone,

For my graduation project at the university, I am evaluating the benefits of mutation testing to validate the effectiveness of StepVerifier tests for reactive programs with Project Reactor. Existing mutators of Java mutation testing tools do not suffice here, because we also need to validate if the streaming nature of reactive programs is tested well enough.

(Mutation testing is the method of validating software tests, where changes are made to the source code (mutants) to test if the test code is able to detect the changes. Part of the mutation testing tool that makes a specific change to the source code is called a mutator.)

For this reason I am trying to make an extension to an existing mutation testing tool, with new mutators. Therefore I need to find ways in which I can mutate reactive code that are specific to the reactive paradigm. At a later phase we are going to examine how much of these mutations are picked up by StepVerifier tests of open-source projects. A first mutator idea is to add a sleep call to every function, to check if the time constraint of reactive programs is tested.

Are there people, experienced with Project Reactor reactive programs and StepVerifier testing, here that have suggestions on other mutators that could be created to test the effectiveness of StepVerifier tests for reactive programs with Project Reactor?
This will help me find out how we can make the most effective method to validate StepVerifier tests, and hopefully contribute to improving reactive code quality.

Thank you in advance.

keturn
@keturn:matrix.org
[m]
Tim L
@seabamirum
Is there a reactive version of the org.springframework.validation.Validator interface, which would have a Mono return value for the Errors? I need to do some custom validation to check if a username exists, but am struggling not to use block()..
Knut Schleßelmann
@kschlesselmann
Any reasons why there is collectList() but not collectSet()on Flux?
1 reply
gertyes
@gertyes
Hi All, I need to write a reactive method to execute a system call. I need to use Process newPocess = Runtime.getRuntime().exec(process); but it should in some way wait for the process to complete with Mono.just(newProcess.waitFor()); before it returns the Mono. It seems that this never happens even when the process complets successfully. This waitfor returns an int but I never get the int back in the calling method. Any help would be appereciated.
3 replies
keturn
@keturn:matrix.org
[m]

We're using Reactor to queue and execute asynchronous tasks, which seems like a standard use case.

We often want to know if there are any outstanding tasks in flight. For example, is there a "save file" operation still in progress?

but I'm having a lot of trouble coming up with the right pattern for that

mxz9102
@mxz9102
Hi, Reactor Devs, my Spring-boot app was not able to start Netty after it was upgraded to 2.5.13 recently. An InterruptedException was thrown out of reactor.core.publisher.BlockingSingleSubscriber.blockingGet. I created a question on stackflow https://stackoverflow.com/questions/72193773/unable-to-start-netty-in-spring-boot-2-5-13. Can anybody take a look and let me know how I can resolve this question? Thanks a lot.
jimmy-magee
@jimmy-magee

Hi Folks, I’m trying to get a count of the frequency words occur in a flux of strings. I note from this issue reactor/reactor-core#931 Simon suggests ‘That said, maybe groupBy is not the ideal tool for this job. Consider exploring using Flux.collect(Supplier, BiConsumer), where the container is e.g. a Map<String, Integer> and the BiConsumer increments the Integer when it encounters a word several times.’

Appreciate a steer on how to to implement this, thanks.

Cheers,
Jimmy.

5 replies
Pratham Krishna
@Prathamkrishna_gitlab
hey, i'm stuck at this one spot. what exactly does :"FluxUsingWhen" mean?
1 reply
i'm merging two publishers using Flux.concat (since the first parameter in the concat is a db call which may cause delays, and that concat adds items to the Flux sequentially)
the second parameter is getting inserted but the first isnt
and when i print out the same, i get "FluxUsingWhen" on the first parameter's result
michaelplavnik
@michaelplavnik
Hi All. In version 3.4.8 cancel is NOT propagated by the groupBy() upstream (independentyly from the GrouppedFlux state). Is this expected behaviour? What is the rationale behind this choice?
Kormákur
@korri123
Is it possible to map first element of Flux<String> differently then subsequent without using an external var? I'm streaming a CSV with header where each line is mapped. I.e. something like
csvFlux.mapFirst(Utils::transformHeader) // applies function to header
  .map(Utils::transformRow); // applies different function to each row
6 replies
Ant
@twitixco_twitter
Hello, everyone. I need to make n requests to a various rest apis. There is an initial timeout but I want to be able to shorten the timeout, or return immediately, if, as the responses come in, the response meets a certain criteria. Is there a way to accomplish this? Any help is greatly appreciated. Thanks!
List<Mono<Response>> responses = new ArrayList<>(); Mono.just(requests) .flatMapIterable(request -> request) .flatMap(request -> { // requestService contains a WebClient call to different rest endpoints Mono<Response> response = requestService.handle(request) .subscribeOn(Schedulers.boundedElastic()) .take(Duration.ofMillis(80000)); responses.add(response); return response; }) .elapsed() .doOnNext(tupel -> { // logic here to determine if certain criteria are met in tupel.getT2() for (Mono<Response> response : responses) { // based on criteria we can reduce duration or // return immediately, either by setting duration to 0 or some other method response.take(Duration.ofMillis(newDuration)); } }) .map(tupel -> tupel.getT2());
1 reply
Ricardo Gaspar
@rjbgaspar

Hello everyone!
I have a very insane reactive streams pipeline (it’s kind of batch processing) and I’m getting java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 200000ms in 'flatMap' (and no fallback has been configured)
The question is I do not know which part of the stream is responsible for throwing the exception:

2022-05-16 08:01:51.037 ERROR 1 --- [     parallel-3] c.g.b.s.pdm.NewImosOrganizerService      : java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 200000ms in 'flatMap' (and no fallback has been configured)
bearcat_1  |
bearcat_1  | java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 200000ms in 'flatMap' (and no fallback has been configured)
bearcat_1  |    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:295)
bearcat_1  |    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:280)
bearcat_1  |    at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:419)
bearcat_1  |    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
bearcat_1  |    at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271)
bearcat_1  |    at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286)
bearcat_1  |    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
bearcat_1  |    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
bearcat_1  |    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
bearcat_1  |    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
bearcat_1  |    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
bearcat_1  |    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
bearcat_1  |    at java.base/java.lang.Thread.run(Thread.java:833)
bearcat_1  |
bearcat_1  | 2022-05-16 08:01:51.042 ERROR 1 --- [     parallel-3] reactor.core.publisher.Operators         : Operator called default onErrorDropped
bearcat_1  |
bearcat_1  | reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 200000ms in 'flatMap' (and no fallback has been configured)
bearcat_1  | Caused by: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 200000ms in 'flatMap' (and no fallback has been configured)
bearcat_1  |    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:295)
bearcat_1  |    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:280)
bearcat_1  |    at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:419)
bearcat_1  |    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
bearcat_1  |    at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271)
bearcat_1  |    at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286)
bearcat_1  |    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
bearcat_1  |    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
bearcat_1  |    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
bearcat_1  |    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
bearcat_1  |    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
bearcat_1  |    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
bearcat_1  |    at java.base/java.lang.Thread.run(Thread.java:833)
bearcat_1  |

Can someone please help?

8 replies
JoyAhan
@JoyAhan
Hi
Can reactor base spring statemachine work with a non reactive stack ?