Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
ravirajsc1
@ravirajsc1
Hi All, a design question if experts can guide... we are building new apps where we need a spring cloud API gateway to aggregate response from couple of microservice and provide it to consumer , wheather api gateway is the place to do aggregation is is different question but if we have to do it is good to use reactor/webflux? is there any out of the box feature spring or any other framework provide? so we are thinking to AKKA (i understanding it will be blocking) or reactor .. any advise or reference will be helpful
2 replies
pandepra
@pandepra
Hi all, what's the best way to gracefully terminate a reactive pipeline in reactor? Calling subscription.dispose() doesn't seem to be handling in-flight events properly. I want the pipeline to finish processing any in-flight events before cancelling the subscription.
1 reply
I. Sokolov
@Happy-Neko
What happens with remaining items after GroupedFlux is canceled? Does groupBy operator re-queue them into new group with same key? (More context in SO question https://stackoverflow.com/questions/66911578)
ruslan
@unoexperto
Folks, what would be idiomatic way to parallelized processing of items but ensure items of the same category are not processed by different threads concurrently ? Imagine I have Flux<Pair<Int, VALUE>> and it emits items such as (1, One), (1, Two), (2, Three), (1, Four), (2, Five). There are separate consumers for each Int. I want all values with 1s and 2s to be processed in parallel, but items of each group must be processed sequentially. Shall I use .window or .groupBy ? Carnality of Int is reasonable, less than 300. But I'm concerned about buffering of items of each group, they can be pretty big.
1 reply
yohasakur
@yohasakur
Hello, I am using webflux and mono filePart for uploading files but the maximum concurrent upload requests is only 4 the number of reactor-http-epoll, is there a better way or am I doing something wrong, please help, thanks.
spartanhooah
@spartanhooah

Anyone familiar with Spock here? I'm taking a class on Reactive Spring, and I've been able to use Spock no problem so far, but now there's a test using StepVerifier.withVirtualTime() and it passes no matter what. Here's the whole test:

def "With virtual time"() {
    given:
    VirtualTimeScheduler.getOrSet()
    def longFlux = Flux.interval(Duration.ofSeconds(1))
        .take(3)
        .log()

    expect:
    StepVerifier.withVirtualTime({ longFlux.log() })
        .expectSubscription()
        .thenAwait(Duration.ofSeconds(3))
        .expectNext(0L, 1L, 2L)
        .verifyComplete()
}

I also don't see the logs from the flux. Any ideas?

shawshawwan
@ZoeShaw101
Anyone knows what could cause this problem ??
截屏2021-04-05 下午12.09.21.png
The connection observed an error: java.lang.IllegalStateException: unexpected message type: DefaultFullHttpRequest, state: 1
shawshawwan
@ZoeShaw101
@violetagg
shawshawwan
@ZoeShaw101
Using 0.9.4.RELEASE
using http client to send post request
Violeta Georgieva
@violetagg
@ZoeShaw101 Can you try with the latest release (0.9.4 is more than an year old)
shawshawwan
@ZoeShaw101
@ZoeShaw101 Can you try with the latest release (0.9.4 is more than an year old)
@violetagg Is this problem netty-reactor 's bug? Is there any issue related?
@violetagg
Violeta Georgieva
@violetagg
Cannot say based only on the exception
with an upgrade you can at least eliminate the difference between 0.9.4 and 0.9.18
as eventually it might be fixed
if it is RN bug of course
shawshawwan
@ZoeShaw101
@violetagg what's RN bug ?
Violeta Georgieva
@violetagg
Reactor Netty bug
shawshawwan
@ZoeShaw101
ok, we can ensure our http server backend is working fine, just cannot figure out why there's connection error in out production env
shawshawwan
@ZoeShaw101
Can you show me any issue about this problem? @violetagg
Violeta Georgieva
@violetagg
Unfortunately I cannot as I don’t know your scenario, you provided only a stacktrace, but the cause of this stacktrace is not one and the same :(
Violeta Georgieva
@violetagg
is it possible that the server sometimes sends incorrect response?
look at this issue reactor/reactor-netty#1057
here the server was sending a body with a response where body is not expected
shawshawwan
@ZoeShaw101
@violetagg thanks, just want to know what's this error mean usually? Or what server error can lead to this IllegalStateException error ?
java.lang.IllegalStateException: unexpected message type: DefaultFullHttpRequest, state: 1
Violeta Georgieva
@violetagg
@ZoeShaw101 check the previous request, it seems the connection is in a bad state
if you have logs you can trace the connection with its ID
it is the ID here [id: 329c6ffd, L:/0:0:0:0:0:0:0:1:64286 ! R:/0:0:0:0:0:0:0:1:64284]
Syed Mainul Hasan
@mainul35

Hello folks, I have a code done in imperative style, where I did some database checks on ClientInfo entity object with mac address and username. If it esists in DB, I am doing something, and if it doesn't I am trying to do something else and return the ClientInfoModel object, which is nothing but the DTO. My previous code is as follows.

public ClientInfoModel registerClient(ClientInfoModel clientInfoModel) {
        Optional<ClientInfo> clientInfoOptional = clientInfoRepository
                .findByMacAndUsername(clientInfoModel.getMac(), clientInfoModel.getUsername());
        ClientInfo clientInfo;
        if (clientInfoOptional.isEmpty()) {
            clientInfo = new ClientInfo();
        } else {
            clientInfo = clientInfoOptional.get();
        }
        BeanUtils.copyProperties(clientInfoModel, clientInfo);
        clientInfoRepository.save(clientInfo);
        return clientInfoModel;
    }

Now I want to change this code to reactive style. I have used R2DBC driver for MariaDB and used it's instance. Note that I have another DB call to save the object after this check. Can someone help me the appropriate way to write this code to reactive style?

    public Mono<ClientInfoModel> registerClient(ClientInfoModel clientInfoModel) {
        Mono<ClientInfo> clientInfoMono = template.selectOne(
                query(
                    where("mac").is(clientInfoModel.getMac())
                    .and("username").is(clientInfoModel.getUsername())
                ), ClientInfo.class);

        return clientInfoMono.zipWith(clientInfo -> {
            if (clientInfo != null) {
                BeanUtils.copyProperties(clientInfoModel, clientInfo);
                clientInfoRepository.save(clientInfo);
            } else {
                ClientInfo clientInfo1 = new ClientInfo();
                BeanUtils.copyProperties(clientInfoModel, clientInfo1);
                clientInfoRepository.save(clientInfo1);
            }
            return Mono.just(clientInfoModel);
        });
}

Note that it is giving me an error on calling BeanUtils.copyProperties(source, target), also I think the save methods was not called in proper reactive style.

Moncef AOUDIA
@aoudiamoncef

Hi @mainul35,

    public Mono<ClientInfoModel> registerClient(ClientInfoModel clientInfoModel) {
        return this.template.selectOne(query(where("mac").is(clientInfoModel.getMac()).and("username").is(clientInfoModel.getUsername())), ClientInfo.class)
                .flatMap(clientInfo -> this.clientInfoRepository.save(this.mapToEntity(clientInfo, clientInfoModel)))
                .switchIfEmpty(this.clientInfoRepository.save(this.mapNewToEntity(clientInfoModel)));
    }

In your mapping function, you mustn't lose the reference to your entity object. Try to use MapStruct to avoid using reflection with BeanUtils.copyProperties(clientInfoModel, clientInfo1);

@Mapper(componentModel = "spring")
public interface ClientInfoMapper {
    Client toEntity(@MappingTarget final Client entity, final ClientInfoModel dto);
}
Syed Mainul Hasan
@mainul35

Hi @mainul35,

    public Mono<ClientInfoModel> registerClient(ClientInfoModel clientInfoModel) {
        return this.template.selectOne(query(where("mac").is(clientInfoModel.getMac()).and("username").is(clientInfoModel.getUsername())), ClientInfo.class)
                .flatMap(clientInfo -> this.clientInfoRepository.save(this.mapToEntity(clientInfo, clientInfoModel)))
                .switchIfEmpty(this.clientInfoRepository.save(this.mapNewToEntity(clientInfoModel)));
    }

In your mapping function, you mustn't lose the reference to your entity object. Try to use MapStruct to avoid using reflection with BeanUtils.copyProperties(clientInfoModel, clientInfo1);

@Mapper(componentModel = "spring")
public interface ClientInfoMapper {
    Client toEntity(@MappingTarget final Client entity, final ClientInfoModel dto);
}

Thank you so much ^_^

shawshawwan
@ZoeShaw101
@violetagg hi, I update from 0.9.4 to 0.9.18.RELEASE ,but got NoClassDefFoundError: reactor/util/retry/Retry error, refer to reactor/reactor-netty#1207 this issue, I added <reactor-bom.version>Dysprosium-SR9</reactor-bom.version> in my pom.xml, but it doesn't work for me? Did I miss anything? thanks!
10 replies
Syed Mainul Hasan
@mainul35

Hello folks, I had the controller code as below, which I am trying to rewrite in reactive style. Can someone guide me for the best practice? My Point for this code is always giving the front end developer a common way of receiving the response. Please correct me if I am wrong.

@PostMapping("/register")
    public ResponseEntity<?> registerClient(@RequestBody ClientInfoModel clientInfoModel) {
        ResponseModel<ClientInfoModel> responseModel = new ResponseModel<>();
        responseModel.setMessage("Successfully registered the client");
        responseModel.setData(clientInfoService.registerClient(clientInfoModel));
        return ResponseEntity.ok(responseModel);
    }

Note that clientInfoService.registerClient(clientInfoModel) returns a mono of ClientInfoModel.

bvenkatbm
@bvenkatbm
Hey Everyone, which version of spring boot is supported by projectreactor?
1 reply
shawshawwan
@ZoeShaw101
with an upgrade you can at least eliminate the difference between 0.9.4 and 0.9.18
@violetagg Hi, It seems upgrade 0.9.4 to 0.9.18 doesn't fix this error : java.lang.IllegalStateException: unexpected message type: DefaultFullHttpRequest, state: 1
4 replies
Michael Hashe
@MichaelHashe
Hi all, general question on multi-step protocols here. I'm trying to write a TcpServer that can handle a certain protocol X. X is a rather chatty protocol, and involves multiple calls-and-responses from a client within a single transaction. There is no set number of these (several of them are repeatable / skippable), but they are ordered and should share context (i.e., info sent in step 1 should be accessible in step 2). We have no control over X itself. How would one go about framing this in this library? I can think of the following for organization, but I haven't had much luck getting anything to work yet.
TcpServer.create()
    .host(HOST)
    .port(PORT)
    .doOnConnection(connection -> {
        // Could add chain of handlers here, one for each step. Would this be reactive? Is there a way to share context?
    })
    .handle((in, out) -> {
        // AFAIK we can only call in.receive() once here. Is there a good way to expand this to multiple steps? 
    })
    .bindNow();
3 replies
Viral Patel
@pviral

Hi all, I am new to Spring Webflux and reactor framework. I would like to know when a user calls my endpoint and the handler I have written is in async fashion, how can i return result back to user?
Here is handler code:

public Mono<ServerResponse> listAll(ServerRequest serverRequest) {
    return ServerResponse.ok().build().doOnNext(r -> Mono.fromCallable(() -> {
        // DAO is synch library included in API service
        return userDAO.findAll();
    }).log()
       .subscribeOn(Schedulers.parallel())
       .subscribe(System.out::println));
}

Routing file:


public RouterFunction<ServerResponse> route(UserHandler userHandler) {
        return RouterFunctions
                .route(GET("/api/v1.0/users").and(accept(MediaType.APPLICATION_JSON)), userHandler::listAll)

The client get immediate 200 OK, however, I would like send the result to client once I get result from DB in async fashion.. any suggestions?

3 replies
Archit Agarwal
@architmmmec_twitter
Hi All, I am using groupBy in a flux, how can I test this functionality?
1 reply
Viral Patel
@pviral
Screen Shot 2021-04-09 at 4.22.30 PM.png
Hi @aoudiamoncef , for some reason gitter won't allow to paster screenshot in thread..
also I notice that in forloop when first request is completed then the next requested gets picked up.. and trying to understand why? Normally, I thought both the request should have picked up by different threads at the same time no?
Moncef AOUDIA
@aoudiamoncef
By default, you are using Accept: application/json header, as a result you will get response in blocking manner which is not recommended. I added support for both for demo purpose and let you see the difference. Add Accept: text/event-stream to your curl commande to use Server Sent Event. If you have a lot of data returned in your stream, curl will close the stream arbitrary. it's seems to be a security lock to avoid consuming an infinite stream.
Vaibhav Agnihotri
@vaibhavagnihotri
Hey I'm using CacheMono with Caffeine. Not able to figure out how to record cache hit/miss metrics. Along with pushing it to statsd I want to log hits and misses. Is there any out-of-the box solution for that?
2 replies
ChandanPadalkar
@ChandanPadalkar
I have spring boot with microservices application. I am using WebClient to call between microservices. When I have two different microservice calls in single reactive chain, webclient is failing to do call to microservices. I am getting 404 http status code. I am using WebClient. Builder with @LoadBalanced annotation as I am using Eureka Service Registry
@LoadBalanced @Bean WebClient.Builder webClientBuilder() { return WebClient.builder(); }
numbertalk
@numbertalk

I have some Reactor Kafka code that reads in events via a KafkaReceiver and writes 1..many downstream messages via 1or more KafkaSenders that are concatenated into a single Publisher. Everything is working great, but what I'd like to do is only emit an event from this concatenated senders Fluxwhen it is complete (i.e. it's done writing to all downstream topics, so not emit anything for each element as it writes to Kafka downstream). This way I could sample() and periodically commit offsets, knowing that whenever it is that sample() happens to trigger and I commit offsets for an incoming event that I've processed all downstream messages for each event I'm committing offsets for. It seems like I could use either pauseUntilOther() or then() somehow, but I don't quite see exactly how. Anyone have any thoughts?

Main Publisher code:

this.kafkaReceiver.receive()
        .groupBy(m -> m.receiverOffset().topicPartition())
        .flatMap(partitionFlux ->
                partitionFlux.publishOn(this.scheduler)
                        .flatMap(this::processEvent)
                        .sample(Duration.ofMillis(10))
                        .concatMap(sr -> commitReceiverOffset(sr.correlationMetadata())))
        .subscribe();

Concatenated KafkaSenders returned by call to processEvent()

return Flux.concat(kafkaSenderFluxA, kafkaSenderFluxB)
        .doOnComplete(LOG.info(“Finished writing all downstream messages for incoming event);
2 replies
jay
@legendjaks
What is the StreamBridge equivalent in reactive kafka to send on-demand messages?