Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Peter Jurkovic
@peterjurkovic
Scheduler question - What will happen with in-light messages when an application receive a shutdown signal? Does the scheduler awaits for termination of all the tasks? (It is not quite clear from the doc)
6 replies
Michael Nitschinger
@daschl
hey folks, is there a good strategy to debug why a repeatWhen operator occasionally results in a Exceptions.failWithOverflow("Could not emit value due to lack of requests”) (which is coming from MonoDelay I think)
2 replies
Andrei Kovalevsky
@spyroid
Hi, is there any possibility to make bufferTimeout() having backPressure?
5 replies
smoomrik
@smoomrik

Hi all. I’ve faced with strange behavior for me. I would be much appreciated if someone can help me undestand that.
I have cold publisher and parallel execution on separate scheduler

Flux.<List<Message>>generate(sink -> {
                // fetch messages
               // messages size from 1 to 10.
                sink.next(messages);
        }).flatMapIterable(Function.identity())
                .doOnError(this::processError)
                .retry()
                .subscribeOn(subscribePool)
                .flatMap(message -> consumeMessageMono(message)
                        .onErrorResume(throwable -> handleMessageProcessingError(throwable, message))
                        .subscribeOn(processingPool), 10)
                .bufferTimeout(10, 200)
                .flatMap(messages -> finalMono(messages)
                  .onErrorResume(throwable -> {
                     log.error(“Error:[{}]", throwable.getMessage(), throwable);
                     return Mono.empty();})
                   .subscribeOn(subscribePool)
                )
                .subscribeOn(subscribePool)
                .subscribe();

where I have processingPool defined as Schedulers.newBoundedElastic(10, 1, “process”);
Sometimes I receive the following error: Task capacity of bounded elastic scheduler reached while scheduling 1 tasks (2/1) which indicates that something was submitted to process pool, but looks like it’s not possible.

2 replies
Jeevjyot Singh Chhabda
@jeevjyot
Hi Folks,
Trying to use FluxSink and not very sure I understand onRequest (https://projectreactor.io/docs/core/release/api/reactor/core/publisher/FluxSink.html#onRequest-java.util.function.LongConsumer-) method. Could anyone please shed some light hot is request count is determined?
6 replies
Jeevjyot Singh Chhabda
@jeevjyot
Ah I see understood! Thank you. When I was testing through stepVerifier initial request count was 256. May be I am missing something
Jeevjyot Singh Chhabda
@jeevjyot
Also another naive question. What would be the best way to keep polling to the queue and reading the messages?
One way I was thinking is to use repeat() indefinitely. Any other suggestion?
4 replies
smoomrik
@smoomrik
So many difficulties users faced with trying to imlement sqs queue polling using reactor. Looks like someone should just create a library for that :)
shawshawwan
@ZoeShaw101
截屏2021-04-29 下午4.57.11.png
Anyone kwon what case will cause this error ?
Suppressed: io.netty.channel.AbstractChannel$AnnotatedConnectException: finishConnect(..) failed: ¾ܾøl: /11.164.62.87:8080 don't kwon what is¾ܾøl?
Andrei Kovalevsky
@spyroid
    sink = Sinks.many().multicast().onBackpressureBuffer();
    sink.asFlux()
            .doOnNext(message -> counter.incrementAndGet())
            .bufferTimeout(300, Duration.ofMillis(1))
            .parallel()
            .runOn(Schedulers.newParallel("MAIN-POOL", 20))
            .doOnNext(messages -> LockSupport.parkNanos(random.nextInt(200) * 1_000_000)) // processing
            .flatMap(messages -> Flux.fromIterable(splitArray(messages)))
            .runOn(Schedulers.newParallel("DB-POOL", 10))
            .doOnNext(messages -> LockSupport.parkNanos(random.nextInt(1000) * 1_000_000)) // persisting
            .subscribe(messages -> counter.addAndGet(-messages.size()));

    Schedulers.parallel().schedule(() -> {
        for (int i = 0; i < 500_000; i++) {
            while (sink.tryEmitNext("O-" + i).isFailure()) LockSupport.parkNanos(1_000_000);
        }
    });
How to make that sink to accepting elements based on workers load? Currently counter value is increasing up to 500k. How to decrease number of messages in sink?
Kieran Boyle
@k-boyle
image.png

Hello, I'm writing a command framework and I'm struggling to come up with a clean solution to the following problem. I start with N potential matches for a given input, and then each match needs to go through various stages before it gets deemed "the valid match" in a scenario where I have a valid match I don't care about the rest, however in the scenario where none are valid I'd like to return the various failures (hopefully this diagram helps) I've written some code that works, alas it commits war crimes

var failures = new ArrayList<Result>(matches.size());
return Flux.fromIterable(matches)
    .flatMap(
        match -> {
            context.command = match.command();
            return match.command().runPreconditions(context)
                .doOnNext(result -> {
                    if (!result.success()) {
                        failures.add(result);
                    }
                })
                .filter(Result::success)
                .map(success -> tokeniser.tokenise(input, match));
        }
    )
    .doOnNext(result -> {
        if (!result.success()) {
            failures.add(result);
        }
    })
    .filter(Result::success)
    .flatMap(success -> argumentParser.parse(context, success.command(), success.tokens()))
    .doOnNext(result -> {
        if (!result.success()) {
            failures.add(result);
        }
    })
    .filter(Result::success)
    .next()
    .flatMap(success -> executeCommand(context, success))
    .switchIfEmpty(
        Mono.defer(() -> {
            context.command = null;
            return Mono.just(new CommandMatchFailedResult(failures));
        })
    );

I've been going through reactor docs and glaring at intellicode for hours now trying to find something that'd help but turned up nothing, any help would be appreciated, thank you!

Stanis Shkel
@sshkel

hi all, have been bashing my head against this for a couple of days. I have the following function
Mono<Void> handle(Blah blah). The function creates a Mono and attaches a couple of doFinally hooks to it.
Is there a way to attach a side effect to the Mono returned by the function that will execute after any doFinally hooks? It's something like

handle(blah).doStuffAfterAllHooks( () -> sideEffect());

Maybe I can break a boundary between Monos, and get away with doFinally, but I haven't found a way. Or maybe I am just crazy

tf318
@tf318
Hello. I have an infinite flux and a single subscriber, which may come and go (so is not perpetually subscribed). It is only interested in the latest value, but once it has been consumed, I don't want it to continue to exist on the Flux (so if the subscriber goes away and comes back, the Flux should not redeliver the same element as it was already consumed once). Am having no luck with Sinks.many().unicast().onBackPressureBuffer() as I think that somehow needs to work in concert with an OverflowStrategy of LATEST but can't see how to hook these up. Any guidance would be gratefully received!
Timur Shaidullin
@timur-sh

Hello!

I try to insert a list of items to PostgreSQL database via R2DBC, but it does not complete. Configurations with example are below.

Could anyone get an advise why this code does not hit a database?

    private Flux<OnlineChartItem> batchInsert() {
        return Mono.from(chartInputConnectionFactory.create())
                .log("start#1")
                .flatMapMany(connection -> {
                    Statement statement = connection.createStatement(INSERT_INTO_ONLINE_CHARTS);
                    for (OnlineChartItem item : items) {
                        this.prepareInsertStatement(statement, item).add();
                    }
                    return Flux.from(statement.execute());
                })
                .thenMany(Flux.fromIterable(items));
    }

    @Bean
    public ConnectionFactory chartInputConnectionFactory(R2dbcProperties r2dbcProperties) {
        ConnectionFactoryOptions factoryOptions = ConnectionFactoryOptions.parse("r2dbc:pool:postgresql://localhost:5432/my_db")
                .mutate()
                .option(ConnectionFactoryOptions.USER, r2dbcProperties.getUsername())
                .option(ConnectionFactoryOptions.PASSWORD, r2dbcProperties.getPassword())
                .build();
        ConnectionFactory connectionFactory = ConnectionFactories.get(factoryOptions);

        R2dbcProperties.Pool pool = r2dbcProperties.getPool();
        ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder()
                .connectionFactory(connectionFactory)
                .maxSize(20)
                .initialSize(10)
                .acquireRetry(Integer.MAX_VALUE)
                .maxAcquireTime(Duration.ofSeconds(2))
                .validationQuery("select 1")
                .build();
        return new ConnectionPool(configuration);
    }
1 reply
sneha-2018
@sneha-2018

Hi Team , I am working with reactor Mono to do some retry operation in my API code , however I I want the handler to wait till it receives response from rest call and then return it back . how can i do it ?

I am facing issue ,as I need to extract response data from Mono<Response> and then return response based on it

Previously I was using .block()
Response r = monoMethod.block() , but this is blocking call, Is there any other alternative to achieve this?

.subscribe() works , but I want the handler to wait here and then process the lines of code below this

yohasakur
@yohasakur
Hi, is there a way to gracefully shutdown reactor rabbitmq receivers, it's on a netty server if thats important?
Jeevjyot Singh Chhabda
@jeevjyot
Hi,
I am using kafka-reactor and having a hard time to gracefully pause and resume consumption. I want to pause the consumption until the currently flux of messages have been processed. I have posted a stackoverflow as well https://stackoverflow.com/q/67495327/6495981 any help would be great thank you
Knut Schleßelmann
@kschlesselmann

Hi! We upgraded to Spring Boot 2.4.5 here and now some services run into

WebClientRequestException: Pending acquire queue has reached its maximum size of 1000; nested exception is reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 1000

Why did this behavior change? Do I simply want to increase this limit or should I prefer some other way using concatMap or something like that to limit the parallel requests?

16 replies
Knut Schleßelmann
@kschlesselmann
Why is there no zipWhen on Flux?
angela b
@brew005_twitter
hello everyone - i'm having an issue with a client library sending webclient requests. I'm getting the following error
[INFO] Exception while notifying listener ResponseListenerProcessor@4a5ef9a2[Reactive[HttpRequest[POST ...HTTP/1.1]@51f7cbed]]
java.lang.NoSuchMethodError: org.springframework.http.client.reactive.ClientHttpResponse.getId()Ljava/lang/String;
at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.getLogPrefix (ExchangeFunctions.java:124)
1 reply
Guillaume DROUET
@gdrouet
Hello everyone, when a consumer is too slow comparing to its publisher and messages start to be dropped, is it possible later for the consumer to receive new messages if it starts to be faster? I would expect this behavior but in my tests I don't get messages anymore after an overflow has occurred.
1 reply
PRASANNA CHEREDDY
@prasanna-chereddy
Hi everyone, how do I pass x-www-form-urlencoded body using reactor?
1 reply
angela b
@brew005_twitter

Hello, I have a client library for a REST service implementing webClient. When I include the library from a SpringBoot application, the webclient call works as expected. When I include the library in a Spring tomcat application running jetty, all of the response isn't received and the code hangs
trace from jetty application

02:03:23.905 [reactor-http-kqueue-3] DEBUG r.n.http.client.HttpClientOperations [debug:249] [->::] - [id:78131242-1, L:/127.0.0.1:63326 - R:localhost/127.0.0.1:8084] Received response (auto-read:false) : [Content-Type=application/json, institutionId=00001, correlation-id=aa928b3e-058b-46c3-9f48-9205a6050813, content-length=6111]
02:03:23.905 [reactor-http-kqueue-3] DEBUG r.n.r.DefaultPooledConnectionProvider [debug:249] [->::] - [id:78131242-1, L:/127.0.0.1:63326 - R:localhost/127.0.0.1:8084] onStateChange(POST{uri=/v1/test, connection=PooledConnection{channel=[id: 0x78131242, L:/127.0.0.1:63326 - R:localhost/127.0.0.1:8084]}}, [response_received])
02:03:23.908 [reactor-http-kqueue-3] DEBUG reactor.netty.http.client.HttpClient [log:145] [->::] - [id:78131242-1, L:/127.0.0.1:63326 - R:localhost/127.0.0.1:8084] READ COMPLETE

trace from spring boot

2021-05-17 01:50:21.336 DEBUG 9698 --- [r-http-kqueue-2] r.n.r.DefaultPooledConnectionProvider    : [id: 0xbe11187e, L:/127.0.0.1:62937 - R:localhost/127.0.0.1:8084] Channel cleaned, now 0 active connections and 1 inactive connections
2021-05-17 01:50:21.336 DEBUG 9698 --- [r-http-kqueue-2] reactor.netty.http.client.HttpClient     : [id: 0xbe11187e, L:/127.0.0.1:62937 - R:localhost/127.0.0.1:8084] READ COMPLETE
14 replies
Aleksey Vasin
@youagree
hello, how i can pass request attributes into rest-controller by WebFilter? now i use contextWrite to put key/value, and get this in rest-controller by deferContextual and it does not work
1 reply
rs2152
@rs2152
I'm facing very bad experience with pub/sub in redis in reactive spring, any one can help me out.
Madhukar Pandey
@madhukar2505
ReactiveValueOperations<String, ProfileInfo> opsForValue = redisTemplate.opsForValue();
Mono<ProfileInfo> mono2 = opsForValue.get("key");
how to check if vaule exiist in m2 or not
?
Hamza Bahassou
@B-hamza
Hello here, While using spring-weblux in a simple controller tests using default netty server, i found that all the Jackson2JsonEncoder tasks are subscribed in a different parallel scheduler, is anyone know why ? Is there any documentation about this choice ?
1 reply
rs2152
@rs2152
hey i'm using Jsonb data type when i try to put data into redis with kryo serializer, i got the exception : Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Class is not registered: io.r2dbc.postgresql.codec.Json$JsonByteArrayInput, when i try to register JsonByteArrayInput class, i'm not able to access this class, any help please
fedortche
@fedortche
hi guys, is there any tutorial/documentation on how to do fusion correctly?
Aleksey Vasin
@youagree
hello, why i catch java.util.NoSuchElementException: Context is empty, i put params in webfilter, and get theirs in aspect?
Aleksey Vasin
@youagree
can i get properties inside aspect from reactor contextn?
blake-bauman
@blake-bauman
When using Flux.parallel(), is there a way to control which rail each message gets assigned? such as through a Predicate?
6 replies
Brigen Tafilica
@BigyG_gitlab
Flux.fromArray(DimRating.values())
                .parallel()
                .runOn(Schedulers.parallel())
                .flatMap(dimRating ->
                        Flux.fromStream(repositoryReader.findByBatchId(batchId, dimRating))
                                .publishOn(Schedulers.parallel())
                                .map(document -> {
                                    Account account = repositoryReader.ConvertBsonDocument2Account(document);
                                    int rc = readCount.incrementAndGet();
                                    try {
                                        processor.process(account);
                                    } catch (InvalidProcessorConfigException e) {
                                        e.printStackTrace();
                                    }
                                    if (rc % 1000 == 0)
                                        log.warn("PROCESSED 1000 ACC");
                                    return repositoryReader.getUpdate(account);
                                })
                                .buffer(10000)
                                .publishOn(Schedulers.parallel())
                                .map(this::bulkWriteResult)
                                .doOnComplete(() -> log.info("ECL Calculation Finished"))
                )
                .subscribe();

    public BulkWriteResult bulkWriteResult(List<Pair<Query, Update>> updates) {
        BulkWriteResult resultList = mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED, Account.class).updateOne(updates).execute();
        log.info("ExecuteBulkBufferedCount {} matchedCount {} modifiedCount {}", updates.size(), resultList.getMatchedCount(), resultList.getModifiedCount());
        return resultList;
    }
THIS IS GOING TO BE A LONG READ SORRY
On the above code we have 10 different DimRating values. What we want to do is make a DB call for each dim rating in parallel to find accounts by dim rating.
In parallel because we proved that we can read faster from mongo if we read in parallel for each dimRating than with a singe findAll call.
We are talking about Millions of data (20M best case) and yes there is no other way we need to read all and process all.
Now each account need to be processed processor.process(account) and this is kind of a blocking method.
Lets say each account needs to go through some processing and this will take 1 sec for each account.
Then after every account is processed we need to update it in db and we thought bulk update by 10000 would be a smart thing.
Now i know i am doing something wrong because the speed for the full read-process-update for 20M accounts is worse in reactive way ,
than with the old school creating new threads and implementing concurrency how we used to do in imperative way.
Now i would LOVE some help here to point me what im doing wrong and what is the beast approach for this task.
In one of my tries since reading is faster i tried to read and directly emmit to a Sink.Many.unicast which was subscribed and the process+update was done there on doOnNext,
But it again maybe im missing something and still it was not on the performance level i was expecting it to be
1 reply
Aleksey Vasin
@youagree
hello, why i catch this? cast Mono<Object> to Mono<String>
class reactor.core.publisher.MonoJust cannot be cast to class java.lang.CharSequence (reactor.core.publisher.MonoJust is in unnamed module of loader 'app'; java.lang.CharSequence is in module java.base of loader 'bootstrap')
bruto1
@bruto1

Is there a clean way to discard the contents of a unicast Many sink's queue (created with onBackpressureBuffer, obviously) when it's never subscribed to?
Only thing that comes to mind right now is keeping track of whether or not the subscription happened using some atomic boolean in sink.asFlux().doOnSubscribe(...) and conditionally discarding contents of its queue by hand in doOnCancel(...) downstream if it was cancelled early enough

A delay between subscriptions to sink.asFlux and downstream is possible for a number of reasons

Context, if anyone's curious: I want to pass reference counted elements to an interaction which may or may not happen but I need to release all of them correctly regardless of how many sink was able to emit to its subscriber

Mudit Khandelwal
@code-storm
In Java Springboot while using webclient, webclient is working fine on my local machine while on dev environment my API throws
504 Gateway timeout.
rs2152
@rs2152
hey how to use ConstraintValidator in reactive spring

package com.caretap.docapp.validator;

import com.caretap.docapp.service.UserService;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

import javax.validation.ConstraintValidator;
import javax.validation.ConstraintValidatorContext;
import java.util.Objects;

@Component
@RequiredArgsConstructor
public class AlreadyRegisteredPhoneValidator implements ConstraintValidator<AlreadyRegisteredPhone, String> {

private final UserService userService;

@Override
public boolean isValid(String value, ConstraintValidatorContext context) {
    return Objects.isNull(userService.findByPhone(value));
}

}

amarjawla
@amarjawla
how to achieve parallelism for Mono.zip(monoA, monoB)
amarjawla
@amarjawla

monoA.subscribeOn(Schedulers.parallel());
monoB.subscribeOn(Schedulers.parallel());
Mono.zip(monoA, monoB);

is that correct in order to achieve parallelism with ZIP opertation

bruto1
@bruto1
subscriptions can still happen on the same thread, depending on your luck
it's best to use different schedulers :)
Jonathan Gray
@jongray
Hi, I'm getting a Flux<AcknowledgableDelivery> from RabbitFlux which I'd like to batch and send on somewhere else. If that send fails I'd like to nack all of the AcknowledgableDelivery in that batch, otherwise ack them. Is there a recommended approach for this? I was thinking of buffer() and then handle the error myself (is there one of the other onError methods I be better off using)? Also, I'd like to build the batch of messages into a request and send using reactor-netty, is there a way to join these together?
5 replies
Moncef AOUDIA
@aoudiamoncef

Hello,
I'm trying to translate an existing RxJava3 project to Reactor, but I found some issues:
1- Scheduler in RxJava3 is an abstract class but in Reactor it's an interface, and all implementations are final classes.
2- RxJava have RxJavaPlugins. utilities classes, but in Reactor there are Exceptions., Hooks., Schedulers. with significant differences which doesn't make the translation easier.

I need to implement a custom Scheduler like this:
https://github.com/ReactiveX/RxAndroid/blob/3.x/rxandroid/src/main/java/io/reactivex/rxjava3/android/schedulers/HandlerScheduler.java

I don't found any custom implementation of Reactor Scheduler, is there any resources which could help me ?

I'm struggling with this HandlerScheduler... you could look at my progression in this repository:
https://github.com/aoudiamoncef/ReactorAndroid/tree/feature/reactor

Feedbacks are welcome

Thanks

atulkd007
@atulkd007

Hello, I am trying to translate like this
Principal principle = ReactiveSecurityContextHolder.getContext().map(x -> x.getAuthentication().getPrincipal());

How can I get 'Principal' Object from ReactiveSecurityContextHolder ?? Please help