Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Michael Ravits
@michaelr524_gitlab
Only one connection receive subscriber allowed
Michael Ravits
@michaelr524_gitlab
.share() helped
@bartveenstra thanks again. The zipWith method works for me and I got rid of the context object. One downside though is that the tuple getT1(), getT2(), make the code much less readable.
Bart Veenstra
@bartveenstra
You can make it more readable by using BiFunctions.
usually, when I get a T1 or T2, I use the first 2 lines to put them back into a readable object again
Michael Ravits
@michaelr524_gitlab
trying to stay tidy with the code where possible, coming from Clojure i appreciate the clean functional style
i'll try with a bifunction lambda
no, it doesn't work the way I hoped
request.zipWith(response)
                .flatMap((req, resp) -> saveStockLock(req, resp))
Bart Veenstra
@bartveenstra
public Mono<ServiceAgreement> createServiceAgreement(StreamTask streamTask, ServiceAgreement serviceAgreement) {
        ServicesAgreementIngest servicesAgreementIngest = accessGroupMapper.toPresentation(serviceAgreement);
        return accessGroupServiceApi.postServiceagreements(servicesAgreementIngest)
            .onErrorResume(WebClientResponseException.class, throwable -> {
                streamTask.error("service-agreement", "create", "failed", serviceAgreement.getExternalId(),
                    "", throwable, throwable.getResponseBodyAsString(), "Failed to create Service Agreement");
                return Mono.error(new StreamTaskException(streamTask, throwable,  "Failed to create Service Agreement"));
            })
            .zipWith(Mono.just(serviceAgreement), storeIdInServiceAgreement());
    }

    private BiFunction<IdItem, ServiceAgreement, ServiceAgreement> storeIdInServiceAgreement() {
        return (idItem, serviceAgreement1) -> {
            serviceAgreement1.setInternalId(idItem.getId());
            log.info("Created Service Account: {} with id: {}", serviceAgreement1.getName(), idItem.getId());
            return serviceAgreement1;
        };
    }
I use it like this sometimes
Michael Ravits
@michaelr524_gitlab
You zip the data with a function
Bart Veenstra
@bartveenstra
For me it’s also a learning process :)
Michael Ravits
@michaelr524_gitlab
So you just call this function somewhere down the chain?
Bart Veenstra
@bartveenstra
yeah. The chain is pretty big. I try to create as many small methods as possible so it reads kinda like a story
Michael Ravits
@michaelr524_gitlab
I wonder if there is any place to look for good code examples to learn from
Daniil Mikhaylov
@mdsina
HI there!
I have a problem with pipeline hanging. How to debug issues like that? We don't using Mono\Flux.create.
kvandenbroek
@kvandenbroek
Goodmorning! Using webclient and .metrics(true) our metrics from micrometer explode (Each url with a different path param is 1 metric). We expected this to aggregate to the endpoint url, and not create a unique metric every time. We use WebClient like so:
webClient
        .get()
        .uri(
            builder -> {
              //.. code here
              return builder.build(params);
            })
Does anyone have an idea how to solve this (Without adding filters to micrometer, this takes away all the useful metrics)
violetagg
@violetagg
@kvandenbroek there is an issue about this
Currently Reactor Netty doesn’t have information for the path params that you define with WebClient
thus it cannot aggregate them as one and the same url
kvandenbroek
@kvandenbroek
Hmm, do you have a link to this issue?
Anything I can change to fix this in the meantime?
violetagg
@violetagg
do you need all metrics or only connection pool
there is no workaround
We need to expose an API so that WebClient can provide this information
kvandenbroek
@kvandenbroek
I'd like to have metrics for the clients to create p90/p99 etc graphs
arun6777
@arun6777
Could anybody help with creating a filter in a spring reactive restcontroller model.
bhhoffmann
@bhhoffmann

Hi, everyone

TL;DR: Adding a SecurityWebFilterChain @Bean in my security config made my rest controller start executing requests on boundedElastic threads and I can't understand why and how to avoid this (assuming it should be avoided).

At work I use Spring Boot with Webflux. I wanted to fool around a bit with the framework to test a couple of things, the threading model, schedulers and wrapping blocking calls to mention some, so I decided to create a new project from scratch to make things a bit simpler. I didn't get far before I discovered something that I find a bit peculiar, and that I'm hoping someone here can help me sort out.

I started out creating a simple rest controller:

@RestController
public class SimpleController {

    private final Logger logger = LoggerFactory.getLogger(getClass());

    @GetMapping("/thread")
    public Mono<ResponseEntity<String>> thread() {

        logger.info("Received a GET request on endpoint /thread");

        return Mono.just("Hello from ThreadsApplication")
                .doOnNext(msg -> logger.info("Controller doing its work on thread: {}", Thread.currentThread().getName()))
                .map(msg -> {
                    logger.info("Returning the message: {}", msg);
                    return new ResponseEntity<>(msg, HttpStatus.OK);
                });
    }
}

All fine and dandy, incoming requests are executed on reactor-http-epoll threads.

Then I wanted to add some security, so I added spring-boot-starter-security to my POM and set up the following configuration:

@Configuration
@EnableWebFluxSecurity
public class SecurityConfig {

    @Bean
    public PasswordEncoder passwordEncoder() {
        return new BCryptPasswordEncoder();
    }

    @Bean
    public MapReactiveUserDetailsService userDetailsService() {
        UserDetails user = User
                .withUsername("a")
                .password(passwordEncoder().encode("a"))
                .roles("USER")
                .build();
        return new MapReactiveUserDetailsService(user);
    }
}

Okay, so now security is activated and you have to provide credentials to access the rest controller's endpoints.

The next step was to play with permissions, so I started simple and added this to the security configuration, effectively "deactivating" it again:

@Bean
    public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {

        http.authorizeExchange()
                .anyExchange()
                .permitAll();

        return http.build();
    }

But suddenly, after adding this, all requests received in the controller started running on boundedElastic threads?! Why does this happen? Is the SecurityWebFilterChain somehow blocking, triggering the use of boundedElastic through some auto-configuring? I thought one of the principles behind Webflux was having an event loop with a limited set of threads that do the main work (reactor-http threads), and offload some work to e.g. boundedElastic if needed, like if you need to do blocking calls. It thus seems very strange that the rest controller suddenly started using boundedElastic threads "by default".

I hope someone can help alleviate my confusion.

The application can be found in its entirety here: https://github.com/bhhoffmann/webflux-threads (PS: The code that makes the rest controller run on boundedElastic threads is commented out in the SecurityConfig class)

Michael Ravits
@michaelr524_gitlab
hmm
is there a zipWith method that can zip multiple items into N tuple?
Michael Ravits
@michaelr524_gitlab
ahhh found it
it's a static method
Flux.zip
Kirill Marchuk
@62mkv
@bhhoffmann I guess it's not the best room, maybe ask in webflux community?
Mikael Elm
@mickeelm
Yes, you might want to try https://gitter.im/spring-projects/spring-boot for this one (there is no specific webflux-chat/community)
Michael McFadyen
@michaelmcfadyen
Hey, we have a webflux application that we deploy to a kubernetes environment. The pod itself is given a certain amount of cpu (let's say 2 cores). The underlying VM that the pod is running on has many more cores (let's say 32). As a result, the number of worker threads in the event loop will be 32. So we have 32 threads all competing for 2 cores worth of cpu. First of all, is this a correct assumption and if so could this lead to performance issues or have any negative side effects?
Rajeev Chowdary Gurram
@rajeevgurram
@michaelmcfadyen your webflux application gets 2 cores, so total threads will be 4
Gaurav
@rgbk21
Hi, I have a question -
Can anybody please explain to me why in the following code each of the items in the range are executed in the same thread? Shouldn't publishOn() create a new thread for each item from range(), thus executing the code in 1seconds? Rather than 10seconds? I get that you can use parallel() with runOn() to map each to separate thread. But I do not understand why I cannot see the same behavior with publishOn()?
        Flux.range(1,10)
                .publishOn(Schedulers.boundedElastic())
                .doOnNext(count -> {
                    logger.info(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
                    try {
                        Thread.sleep(1_000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                })
                .subscribe();
Rajeev Chowdary Gurram
@rajeevgurram
how to configure WebClient to open X number of TCP connections at any given time ?
Michael McFadyen
@michaelmcfadyen
@rgbk21 this gives a pretty good explanation and example of what you are trying to achieve
https://stackoverflow.com/questions/49489348/project-reactor-parallel-execution
Gaurav
@rgbk21
Ah. Ok . I think I see it now. Thanks @michaelmcfadyen ! That helped.
atarutin
@atarutin
        Flux<B> rf = Flux.fromIterable(bs)
                .scanWith(Flux::<B>empty, (bf, b) -> bf
                        .filter(bi -> bi.getId().equals(b.getParentId()))
                        .switchMap(bi -> {
                            log.debug("M: {} <= {}/{}", bi.getId(), b.getId(), b.getParentId());
                            b.setParent(bi);
                            bi.getChildren().add(b);

                            return Flux.just(b);
                        })
                        .defaultIfEmpty(b))
                .flatMap(bf -> bf);
this isn't doing what i thought it would do
none of the fields are set
kvandenbroek
@kvandenbroek
Hey guys, we are converting our app to fully Netty Spring Webflux. Almost done, however I currently need to convert our excel download (So we expose an excel file) to reactive types. We store data in a cloud bucket, so we have access to a ByteArrayOutputStream (extends OutputStream). Does someone have an example how to expose a OutputStream with WebFlux?
violetagg
@violetagg
@kvandenbroek please use Spring Boot/Framework channels for questions related to WebFlux
kvandenbroek
@kvandenbroek
Alright, thanks