by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Niranjan
@nnanda2016
depends what suits your need :)
Niranjan
@nnanda2016
@skaparate won't this aReactorObject.flatMap { someObjectThatReturnsMonoOrFlux.block() } result in exception - blocking on reactor thread?
Daniil Mikhaylov
@mdsina
Hi there!
it's possible to inherit or copy subscriber context to nested subscriber implicitly?
public void methodA() {
    return Mono.fromCallable(() -> {
        ...
    }).flatMap(o -> {
        someService.doJob()
              .subscribe(); // There are will be empty subscriber context
    });
}
3 replies
skaparate
@skaparate

@nnanda2016 I didn't knew that, but it seems to be the case: reactor/reactor-core#1972.

I'm still reading the docs :sweat_smile:, so thanks for the input

Scott White
@kibbled
@mdsina if methodA can return a Mono or Flux it should
the few times where I’ve had to work around that are for runners (classes that execute at server startup) and Spring scheduled jobs
vishal singh
@zealvault
Hi, How can I get Flux<Student> from Stream<Mono<Student>>. Thanks
4 replies
Omid Barat
@obarat
what's the correct return type for a method that returns a Mono.error(new Error())?
1 reply
Buck Bukaty
@bbukaty
I think I may have found an error in the documentation for Mono.transformDeferred.
the illustration shows the second subscriber getting a stream with two elements, but if I understand this example's usage of repeat, I think there should be three elements?
Alif Bhaskoro
@xpanda17

Hi I'm new on reactive programming and web client. I used web client to do API call to external API. On load testing (where I mocked the external API, so it will always return in constant response time), I found that there is a delay when reaching certain number of request . After few experiments, I conclude that I can fix this delay by simply increasing

  • Number of max connections in connection pool
  • Number of threads in event loop (I set the webclient to use separate event loop)

Currently I set it to a high number (2000 connections + 200 event loop thread).

Imo using high number of threads and connections is not a proper solution for production env. Does anyone has any idea how to fix it? Thanks

3 replies
Jeevjyot Singh Chhabda
@jeevjyotc

Hi, We use reactive kafka library for producing and cosuming and that works great. I need to design a solution to check the connectivity between my service and kafka. I was thinking of using kafka admin client but it looks like reactive kafka doesn’t provide a way to do that! Any advice or recommentation how should I proceed?

Worst case scenario: I produce a message ( but this would pollute the topic with dummy/test/health message)

Thanks for your time

venkatasreekanth
@venkatasreekanth
can anyone explain what could cause this
reactor.core.Exceptions$StaticRejectedExecutionException: Scheduler unavailable

2020-09-02 23:58:08.671 ERROR 60954 --- [sample-group8-1] reactor.core.scheduler.Schedulers        : KafkaScheduler worker in group main failed with an uncaught exception

reactor.core.Exceptions$StaticRejectedExecutionException: Scheduler unavailable

2020-09-02 23:58:08.672 ERROR 60954 --- [sample-group8-1] reactor.core.scheduler.Schedulers        : KafkaScheduler worker in group main failed with an uncaught exception

reactor.core.Exceptions$StaticRejectedExecutionException: Scheduler unavailable

2020-09-02 23:58:08.674 ERROR 60954 --- [sample-group8-1] reactor.core.scheduler.Schedulers        : KafkaScheduler worker in group main failed with an uncaught exception
Here is the code I wrote
public ReceiverOffset storeInDb(List<ReceiverRecord<String, String>> receiverRecords){
        Instant before = Instant.now();
        MongoDatabase mongoDatabase = mongoClient.getDatabase(mongoDatabaseName);
        MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(mongoCollectionName);
        List<UpdateOneModel<Document>> list = new ArrayList<>();
        receiverRecords.forEach(r->{
            ReceiverOffset offset = r.receiverOffset();
            list.add(new UpdateOneModel(new Document("Item ID",r.key()), new Document("$set",Document.parse(r.value()).append("_id", r.key())),new UpdateOptions().upsert(true)));
        });
        log.debug("upsert list size {}",list.size());

        BulkWriteResult bulkWriteResult = mongoCollection.bulkWrite(list);
        Instant after = Instant.now();

        System.out.println("write time = "+Duration.between(before, after).getSeconds());
        log.info(bulkWriteResult.toString());
        return receiverRecords.get(receiverRecords.size()-1).receiverOffset();
    }


    @Override
    public void run(ApplicationArguments args) throws Exception {
        MongoDatabase mongoDatabase = mongoClient.getDatabase(mongoDatabaseName);
        MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(mongoCollectionName);
        Scheduler scheduler = Schedulers.newBoundedElastic(60, Integer.MAX_VALUE, "partition-consumer", 600, true);

        ReceiverOptions<String,String> options  = receiverOptions.subscription(Collections.singleton(itemTopic))
                .commitInterval(Duration.ZERO)
                .commitBatchSize(0)
                .addAssignListener(receiverPartitions -> log.debug("onPartitionsAssigned {}",receiverPartitions))
                .addRevokeListener(receiverPartitions -> log.debug("onPartitionsRevoked {]", receiverPartitions));

        Flux<ReceiverRecord<String,String>> kafkaFlux =  KafkaReceiver.create(options).receive();
        kafkaFlux.groupBy(m -> m.receiverOffset().topicPartition()).flatMap(partitionFlux->partitionFlux.publishOn(scheduler)
                                                                    .buffer(1000).map(receiverRecords -> storeInDb(receiverRecords))
                                                                    .concatMap(receiverOffset -> receiverOffset.commit()))
                                                                    .doOnCancel(()->{scheduler.dispose();})
                                                                    .retry()
                                                                    .subscribe();


    }
1 reply
郝羽
@harry-hao
Maybe the scheduler was disposed? @venkatasreekanth
Laurent Bovet
@lbovet
I need to schedule processing according to a key. All items with the same key must be processed by the same thread. I have not found a scheduler for that. Should I implement my own Executor and use ?
Scheduler.fromExecutor(getMyExecutor(key)) ?
Buck Bukaty
@bbukaty

the illustration shows the second subscriber getting a stream with two elements, but if I understand this example's usage of repeat, I think there should be three elements?

I opened this quick issue re:the error in that diagram
reactor/reactor-core#2358

Red444
@MiloS444
What would be the best approach to compare two Mono<List<SomeObj>>?
Daniil Mikhaylov
@mdsina
Hi there!
How can I handle error or next signals in one operator? handle looks like not about errors
Mister_Jesus
@Bittuw
@mdsina, Mono::materialize or Flux::materialize
Saad Rashid
@saadrashid_gitlab
Hi, is there a way to make GraphQL client reactive ?
I need to convert Mono.toFuture in order to make it work
rbrose
@rbrose
Hi
    public Mono<Boolean> linkNewBookmarkTopic(Long userId, List<BookmarkRequest> bookmarkRequests) {
        Flux<BookmarkRequest> linkedTopics = Flux.fromIterable(bookmarkRequests);

        return linkedTopics.flatMap(br -> {
                    var linkToTopic = databaseClient.insert()
                            .into(TopicLinks.class)
                            .using(
                                    new TopicLinks(null, br.getTopic_id(), br.getLinked_topic_id(), userId, OffsetDateTime.now())
                            )
                            .fetch()
                            .one().then();

                    var executeOnlyIfTopicNotExist = databaseClient.insert()
                            .into(TopicLinks.class)
                            .using(
                                    new TopicLinks(null, br.getLinked_topic_id(), br.getTopic_id(), userId, OffsetDateTime.now())
                            )
                            .fetch()
                            .one().then();

                    var linkTopicToLink = databaseClient.select().from(TopicLinks.class)
                            .matching(Criteria.where("topic_id").is(br.getLinked_topic_id()).and(Criteria.where("linked_topic_id").is(br.getTopic_id())))
                            .fetch()
                            .one()
                            .then();

                    return linkToTopic.mergeWith(linkTopicToLink.switchIfEmpty(executeOnlyIfTopicNotExist)).then();
                }
        ).then(Mono.just(Boolean.TRUE));
    }
linkToTopic.mergeWith(linkTopicToLink.switchIfEmpty(executeOnlyIfTopicNotExist)).then();
I want execute "executeOnlyIfTopicNotExist" only if linkTopicToLink don't have an result
if "linkTopicToLink" has one row .. then "executeOnlyIfTopicNotExist" must not be executed
At the moment executeOnlyIfTopicNotExist is always exectuted
6 replies
natraj09
@natraj09
I am using the metrics() and tag() method in the Flux class to capture few metrics for e.g flux.filter().tag("filter","true").flatMap().metrics() . Is there a way I can apply a tag inside filter, i.e the values that were ignored ?
eonezhang
@eonezhang
Hi buddy, anyone could share me some links about the mechanism of subscriberContext subscribeOn publishOn , quite not understood the execution flow of Mono or Flux.
Ugnius Rumsevicius
@UgiR
@simonbasle is this doc snippet from Spring Cloud, showing use of EmitterProcessor#onNext, correct or should one retrieve a sink from the processor in this case?
https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.6.RELEASE/reference/html/spring-cloud-stream.html#_using_reactor_api
Scott Busche
@busches
Is it intended behavior that if an error is emitted/thrown inside filterWhen it discards the event? We'd expect it to emit the error to the error handlers, but it is not
demtnt
@demtnt
Hi. What is the correct way to union 2 monos(with possible empty result) to another complex object? As I understand zip is not the right way because it skips empty mono?
3 replies
magware
@magware-dev

Hi, I have a question about Mono.zipDelayError. The javadoc says:

If a Mono source completes without value, the other source is run to completion then the resulting Mono completes empty

// core/test 3.3.9
TestPublisher<Integer> testPublisher = TestPublisher.create();

StepVerifier.create(
    Mono.zipDelayError(
        Mono.empty(),
        testPublisher.mono()
    )
)
    .then(() -> testPublisher.emit(1))
    .verifyComplete();

testPublisher.assertWasRequested();
testPublisher.assertWasNotCancelled();

This test case fails with java.lang.AssertionError: PublisherProbe should not have been cancelled but it was.

So looks like the other publisher is actually cancelled and not run to completion. Did I get anything wrong here??

venkatasreekanth
@venkatasreekanth
 Flux<ReceiverRecord<String,String>> kafkaFlux = KafkaReceiver.create(options).receive();

        kafkaFlux.groupBy(m -> m.receiverOffset().topicPartition()).flatMap(partitionFlux->partitionFlux.publishOn(scheduler)
                                                                    .buffer(1000)
                .map(receiverRecords -> storeInDb(receiverRecords))
                                                                    .concatMap(receiverOffset -> receiverOffset.commit()))
//                                                                    .doOnCancel(()->{scheduler.dispose();})
                                                                    .retry()
                                                                    .subscribe(
                                                                            new Consumer<Void>() {
                                                                                @Override
                                                                                public void accept(Void unused) {

                                                                                }
                                                                            }, error -> error.printStackTrace()
                                                                    );


    }
When I add retry, I get the following exception, what am I doing wrong here
 Multiple subscribers are not supported for KafkaReceiver flux
    at reactor.kafka.receiver.internals.DefaultKafkaReceiver.start(DefaultKafkaReceiver.java:308) ~[reactor-kafka-1.2.2.RELEASE.jar:1.2.2.RELEASE]
    at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$createConsumerFlux$10(DefaultKafkaReceiver.java:263) ~[reactor-kafka-1.2.2.RELEASE.jar:1.2.2.RELEASE]
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:162) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onSubscribe(FluxPublishOn.java:206) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.EmitterProcessor.subscribe(EmitterProcessor.java:169) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxRetry$RetrySubscriber.resubscribe(FluxRetry.java:110) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxRetry$RetrySubscriber.onError(FluxRetry.java:94) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:834) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:600) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
2020-09-15 11:35:05.369 ERROR 31292 --- [ample-group11-1] reactor.core.scheduler.Schedulers        : KafkaScheduler worker in group main failed with an uncaught exception

reactor.core.Exceptions$StaticRejectedExecutionException: Scheduler unavailable
Kirill Marchuk
@62mkv

Hi chat! question related to reactive-RabbitMQ: we have a problem which I only could reproduce against our in-house RabbitMQ server, and even then only under some specific circumstances.

the gist of the problem is that occassionally, when our application (WebFlux/Reactive-RabbitMQ) is publishing and receiving many messages in parallel (think more then 20 parallel consumers, I use k6 --vus 50 to create load), it starts throwing exceptions like this:

com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: javax.net.ssl.SSLException: Tag mismatch!
    at com.rabbitmq.client.impl.AMQConnection.ensureIsOpen(AMQConnection.java:175) ~[amqp-client-5.9.0.jar:5.9.0]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:

and

javax.net.ssl.SSLException: Buffer closed
        at com.rabbitmq.client.impl.nio.SslEngineHelper.write(SslEngineHelper.java:176) ~[amqp-client-5.9.0.jar:5.9.0]
        at com.rabbitmq.client.impl.nio.SslEngineByteBufferOutputStream.doFlush(SslEngineByteBufferOutputStream.java:59) ~[amqp-client-5.9.0.jar:5.9.0]
        at com.rabbitmq.client.impl.nio.SslEngineByteBufferOutputStream.flush(SslEngineByteBufferOutputStream.java:53) ~[amqp-client-5.9.0.jar:5.9.0]
        at java.base/java.io.DataOutputStream.flush(DataOutputStream.java:123) ~[na:na]
        at com.rabbitmq.client.impl.nio.NioLoop.run(NioLoop.java:244) ~[amqp-client-5.9.0.jar:5.9.0]
        at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

and

reactor.rabbitmq.RabbitFluxException: Error while setting publisher confirms on channel
        at reactor.rabbitmq.Sender.lambda$sendWithTypedPublishConfirms$6(Sender.java:282) ~[reactor-rabbitmq-1.4.4.RELEASE.jar:1.4.4.RELEASE]
        Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:

etc. there were certainly some other ones, can't find in the logs right now. When I run exactly the same scenario with "traditional" stack (Spring WebMVC+Spring AMQP), not a single exception of this sort occurs

is it worth reporting, given I cannot provide a reproducing case? I could collect whichever information is necessary though, if I would 've been suggested appropriate tools or suggestions on how to pin it down
Dalton Mills
@ddmills

Hello! We're getting this exception in our rather large reactive application. I don't know where to begin with tracking where this error is coming from, as the message feels kind of vague. Any thoughts on where i might begin to look?

image.png

3 replies
Ioannis Noukakis
@ioannisNoukakis
Hey guys, Do you have a good linter for your reactor projects?
David Herberth
@Dav1dde
I have a Flux<ByteBuffer> and want to accumulate the the byte buffers until a certain separator is reached, read the header (first part until the separator) and then wrap everything in a java object Message(parsedHeaders, Flux<ByteBuffer>) the flux inside the object should contain the full data (including the header). I tried accumulating with bufferUntil() then using switchOnFirst to transform the flux into a mono with the java object inside, the problem is, switchOnFirst().then() cancels the flux after one item which switchOnFirst passes on to the original source, which also cancels the flux containing the data. Any ideas how I can do that?
Bart Veenstra
@bartveenstra
Quick question. I have 2 Mono’s emitting either an Mono.empty or a value. The value I want to return is one the 2 that’s actually returning a value
But Mono.first also takes the first Mono.empty. Is there a way I could subscribe to the first value that comes in?
3 replies
Prashant Pandey
@limafoxtrottango
I am trying to understand the behaviour of exponentialBackoffWithJitter, because it doesn't quite behave the way I expect it to. Q1: exponentialBackoffWithJitter(Duration firstBackoff, Duration maxBackoff) doesn't work when maxBackoff is null. I HAVE to provide a non-null value here for it to work. Q2: Despite the jitter, the interval b/w subsequent retries always increases. I would expect that since it's chosen randomly b/w initialBackoff and initialBackoff 2 *n, it should not follow this increasing trend always?
Prashant Pandey
@limafoxtrottango
Also, what the use of maxBackOff? The retries might never terminate if the interval chosen at random is always less that this and there is no explicit cap on the number of retries using .retryMax.
parthashah
@Parthashah

Hi,
very new to webflux, required to support an entity for older code.
example : Group [POJO] having User [POJO] , where I can have a setUsers method in Group. which I can display as json :
{ Groupid : "xyz", users: [{userid: 1}, {userid: 2}] .. so on.

so how we can add each element of Flux<Group> merge with Mono<List<User>> to produce such output.
Can any one help with this, any pre-implemented relational github project will also work for reference.

ruslan
@unoexperto
Hi everyone! What is the right way to delay execution of Mono? I have custom retryWhen logic where I get delay time from the external API. No matter what I try though there is no delay. Here is my function. Could you please tell me what I'm missing ?
internal fun <T> Mono<T>.retryAfter(maxRetries: Int, uri: URI): Mono<T> {
    return this.retryWhen(Retry.from {
        it.map { rs ->
            val ex = rs.failure()
            if (ex is RetryAfterException && rs.totalRetries() < maxRetries) {
                println("*** API throttling on call to $uri. Will wait for ${ex.delay}. Retry count: ${rs.totalRetries()}. ms ${System.currentTimeMillis()}")
                Mono.delay(ex.delay.plusMillis(500), Schedulers.parallel()).then(Mono.defer({
                    println("   Waited. ${System.currentTimeMillis()}")
                    this
                }))
            } else
                Mono.error(rs.failure())
        }
    })
}