Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
KATKrazy
@KATKrazy
I need it too
Oleh Dokuka
@OlegDokuka
@mwmitchell @KATKrazy can you please clarify what you mean by “multiple” do you expect that elements will be processed kinda “in parallel” or you expect that you can just submit values for processing from multiple threads but actual processing will be done “serially” (a.k.a sequentially)?
4 replies
Daan Kerkhofs
@kerkhofsd

I see this has been asked before, however I did not find an answer to the question:
Is there specific reason for the move of AssertSubscriber (and earlier TestSubscriber) to the test source-set of reactor-core? I think it would be great to have its functionality publicly exposed via reactor-test?

https://github.com/reactor/reactor-core/blob/3f19e1806574b6cbc5bd45d7410c14bdcdca04ce/reactor-core/src/test/java/reactor/test/subscriber/AssertSubscriber.java

Matt Mitchell
@mwmitchell
I have a subscription to an "input" flux, which has a doOnNext handler that blocks, and for each doOnNext, it emits multiple items to an "output" sink. I am occasionally hitting this error when calling tryEmitNext on the output sink: Sinks.EmitResult.FAIL_OVERFLOW - how do I deal with this?
4 replies
R2J
@rajeev-jha1988
how can i make that all the AOP should finish with execution then my actual controller should get called
due to non blocking nature AOP call finish executed after method call
Matt Mitchell
@mwmitchell
I'm trying to create branches of transformations based on an item value. I know I'm not doing this correctly (because it doesn't work) but anyone know what I'm doing wrong? The output I want here is a stream of 2s and 3s in the correct sequence:
Flux.range(1, 11)
        .publish(f -> f.filter(i -> i % 2 == 0).log("even").map(i -> 2).switchIfEmpty(f))
        .publish(f -> f.filter(i -> i % 2 != 0).log("odd").map(i -> 3))
        .log("output")
        .subscribe();
8 replies
Phil Clay
@philsttr
1 reply
David Schneider
@bivab

Hello, I have a question about the expected behaviour of the .publish and error signals. Testing the operator on Flux and Mono is giving me different results in that specific case.
Testing

StepVerifier.create(Mono.<Integer>error(new RuntimeException("Error")).publish(x -> x))
        .expectErrorMatches(throwable -> throwable.getMessage().equals("Error"))
        .verify();

fails while the same for Flux passes, I'm wondering which would be the expected behaviour?

Timur Shaidullin
@timur-sh

Hi!

I need to execute blocking call by JdbcTemplate in my reactive program. I found in official docs that the best way to do it use a Schedulers.boundedElastic(). But, when I tried to execute it by under Schedulers.parallel(), I got the same non blocking result. Which of Schedulers.parallel() and Schedulers.boundedElastic() is perferable to use?

6 replies
Dan Cohen-Smith
@dancohensmith
regression with reactor kafka 1.3.x. No longer calls doOnNext with the sender when there is an error unless toStopOnError is set. This means that you have no idea what has failed to send when a failure occurs.
1 reply
malone
@malone081021
is there a hook when hop thread with pushlishOn or subscribeOn? or reactor intercepting thread switch
Thanveer
@Thanvee85299935_twitter

Hi I am new to webflux/reactive programming. I have a query , I am trying to add a filter and once the filter condition is success, I will calling the request handler method using "next.handle(request)"

i.e below is my RouterFounction:

public RouterFunction<ServerResponse> sampleRouter() { route(POST(PATH).and(accept(APPLICATION_JSON)), requestHandler::login).filter(new FilterHandler()); }

And inside the filter() method I am checking the condition and if the condition is true then it should call the requestHandler function "login()""

public Mono<ServerResponse> login(final ServerRequest serverRequest) {
final String serviceName = "login";
LOGGER.info("Service Method Invoked : {}", serviceName);
return (Mono<ServerResponse>) serverRequest.bodyToMono(Request.class)
.flatMap(request -> serviceCaller.callPost(serverRequest, request, Object.class, serviceName))
.flatMap(response -> {
return ServerResponse.ok().body(fromObject(response));
}).onErrorResume(error -> errorHandler.throwableError(error));
}
FilterHandler.java overidden filter method is as below:

@Override
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
Mono<Request> requestMono = request.bodyToMono(Request.class);
Mono<Data> dataMono = data.retrieveData(request, Data.class);

return Mono.zip(requestMono, dataMono).flatMap(combinedData -> {
if (combinedData.getT1().getId.equals(combinedData.getT2().getId())) {
return next.handle(request); // this will be calling the request handler method login()
} else {
return ServerResponse.status(403).build();
}
});

}

But in this case the login method will not return the "response" ,as its returns before " .flatMap(response -> {
return ServerResponse.ok().body(fromObject(response));" of the login method.

Is there any appropriate way in reactive programming to return the response from login method when it is invoked within the flatMap of Mono.zip(requestMono, dataMono)?

3 replies
Jorge F. Sánchez
@jfsanchez_gitlab
Hi everyone, is there any example project on how to configure Reactor Kafka and Spring Cloud Sleuth??
4 replies
Maksym Hryhoriev
@max-grigoriev

Hi there,

How is it possible

Mono.just(someValue)
    .doOnNext(err -> log.error("on next..."))
    .flatMap(client::execute)
    .doOnError(err -> log.error("on err... {}", err.hashCode()))

and I see such logs:

18:04:00.023Z ERROR [boundedElastic-3]       - on next...
18:04:00.826Z ERROR [reactor-http-kqueue-12] - on err... 1282189168
18:04:00.826Z ERROR [reactor-http-kqueue-11] - on err... 1561963113

If it's Mono then I should receive a single value or a single error. But I see two errors.

2 replies
Alif Bhaskoro
@xpanda17
Hi all, I'd like to learn more about cancellation signal that propagated by Mono (mainly on when this signal is propagated), but I can't seem find the documentation about this, any idea?
2 replies
rs2152
@rs2152
Hi I'm new In Spring Reactive, I want to store some data in database and after that redis, whenever i convert to object into json using object mapper there is checked exception, how i can handle that, anyone have any demo? Thanks
Mantas Matūzas
@mantoshelis

Hello, I have simple logic to register user if it not exists. If exists - error should be thrown. Unfortunately, switchIfEmpty (when user not found) is not being called.
I'm using DynamoDB async client (it uses CompletableFuture)

My handler:

    public Mono<ServerResponse> register(ServerRequest serverRequest) {
        return serverRequest.bodyToMono(UserRegisterRequest.class)
                .flatMap(request -> userRepository.getByPhone(request.getPhone()))
                .flatMap(existing -> Mono.error(new UserAlreadyExistsException(existing.getPhone())))
                .switchIfEmpty(serverRequest.bodyToMono(UserRegisterRequest.class))
                .cast(UserRegisterRequest.class)
                .flatMap(request -> Mono.just(User.register(request.getPhone(), request.getCountry())))
                .flatMap(user -> userRepository.save(user).zipWith(Mono.just(user)))
                .flatMap(tuple -> Mono.just(tuple.getT2()))
                .flatMap(user -> ServerResponse.created(UriComponentsBuilder.newInstance()
                                                                .path("/users/{phone}")
                                                                .buildAndExpand(user.getPhone())
                                                                .toUri()).build());

    }

My repository:

    public Mono<User> getByPhone(String phone) {
        var key = Key.builder().partitionValue(phone).build();
        return Mono.fromFuture(users.getItem(key)).map(UserMapper::map);
    }

I can't find issue. Could somebody help me with that?

5 replies
Matt Mitchell
@mwmitchell
Is there a way to create a publisher/subscriber that will only ever work with 1 item at a time with no buffering? I'd like the publisher (sink.emitNext) to block/retry if the previous item is not fully processed yet, waiting for the request count to become > 0 or for the subscription to be disposed. I don't see a clear way to do this though, even when I implement a CoreSubscriber, doing subscriber.request(1) - the publisher seems to ignore this. It seems that the default request size is 256. Is there a way to control that? I'm using Sinks.many().unicast().onBackpressureBuffer(myQueue) for this.
3 replies
Sunny Shaw
@sunny-shaw
Hi, I am not sure what is wrong with the below code snippet. Not able to get the correct output.
Can someone point out what is wrong here?
@Test
    fun stringBuilderWithWebFLux() {
        val builder = StringBuilder()

        builder.appendln("Initial")

        val list = listOf(1,2,3)

        val res = Flux.fromIterable(list)
            .flatMap {
                apiCall(it).map { builder.appendln(it) }
            }.then(builder.toString().toMono())

        StepVerifier.create(res)
            .consumeNextWith {
                print(it) //returns Intial instead of Initial \n Value 1 \n Value 2 \n Value 3
            }.verifyComplete()
    }
private fun apiCall(it: Int?): Mono<String> {
    return "Value $it".toMono()
}
1 reply
Sagar Raj
@SagarrajRaj_twitter

Hello,

I am seeing a weird behavior, I am calling a mono inside doOnSuccess & it seems to be automatically getting subscribed. Is this a bug?

public class DoOnSuccessTest {

    public static void main(String[] args) {


        Flux.range(1, 2)
                .flatMap(i->monoFromCallable(i).doOnSuccess(DoOnSuccessTest::monoFromCallable))
                .subscribe(System.out::println);
        try {
            Thread.sleep(5000);
        } catch (Exception e) {
        }
    }

    public static Flux<Integer> testFlux() {
        return Flux.range(1,10);
    }

    public static Mono<Integer> monoFromCallable(Integer i) {
        return Mono.fromFuture(completableFuture()).log();
    }

    public static CompletableFuture<Integer> completableFuture() {
        Integer val = ThreadLocalRandom.current().nextInt();
        System.out.println("calling thread local random "+val);
        return CompletableFuture.completedFuture(val);
    }

}
2 replies
Robert Elliot
@Mahoney
Hello - anyone ever seen a test verify() fail to return after a thenCancel()?
import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.kotlin.test.test

class FluxTest {

  @Test
  fun `test the things`() {

    val messageProcessor = MessageProcessor<String>()

    val flux = Flux
           .create<String> { sink ->
             messageProcessor.register { message -> sink.next(message) }
           }
           .share()

    messageProcessor.send("1")

    flux
      .test()
      .expectNextMatches { it == "1" }
      .thenCancel()
      .verify()
    }
}

class MessageProcessor<T> {
  private val listeners = mutableListOf<(T) -> Unit>()
  fun register(listener: (T) -> Unit) = listeners.add(listener)
  fun send(message: T) = listeners.forEach { it.invoke(message) }
}
5 replies
Mario
@mmaryo

Hello
I try to move code from Stream to Flux
I notice with ParallelStream, map functions is do in different threads
But not with Flux
I try this

Flux.just(httpClient1, httpClient2)
            .publishOn(Schedulers.parallel())
            .subscribeOn(Schedulers.parallel())
            .map { http ->
                val sw = StopWatch.createStarted()
                log.info("Sending request with ${http.name}")
                val res = web3j.ethSendRawTransaction(transaction.raw).send()
                log.info("Response with ${http.name} after ${sw.getTime(TimeUnit.MILLISECONDS)} ms")
                res
            }

I see in logs :

16:04:33,389 [parallel-2] Sending request with httpClient1
16:04:33,487 [parallel-2] Response with httpClient1 after 97 ms
16:04:33,487 [parallel-2] Sending request with httpClient2
16:04:33,657 [parallel-2] Response with httpClient2 after 170 ms

So Flux is waiting the first request before execut the second
How to switch to real parallel code ?

3 replies
codingforeternity
@codingforeternity
Do we have any recommendations on using WebSockets with Reactor?
1 reply
bruto1
@bruto1
now that 3.4.3 is out, can anyone please take another look at reactor/reactor-core#2585 ?
Matt Mitchell
@mwmitchell
I'm using parallel().runOn() and seeing that when I dispose the subscription, doOnTerminate is never called. What am I missing?
2 replies
Red444
@MiloS444

Hi all,

I have question regarding Paging on Couchbase DB with rector core. I am trying to get all userIds from database /around 300 000/. Query is to slow and I believe paging would help a lot. I saw in docs suggestion to use Pageable but it seems it dose not work. Dose anyone have any idea how to achieve this?

public interface UserRepository extends ReactiveCouchbaseRepository<UserDoc, String>
{
    @Query("SELECT user.id.`value` as id "
    +  "FROM #{#n1ql.bucket} "
    + "WHERE #{#n1ql.filter}")
    Flux<StringValue> findAllUserIds();
}
2 replies
Delegue Alexandre
@larousso

Hi,

I’m using reactor kafka and I would like to handle failures by deconnecting from kafka and reconnecting with exponential backoff.
For exemple, if I need to store messages in a DB and the DB is not available, I would like to let the stream crash, disconnect from kafka without commiting and reconnect after a delay rereading the non commited message and reprocess them.

I used to do it with akka stream this way :

RestartSource
        // Restart source will restart the stream if it fail 
        .onFailuresWithBackoff(RestartSettings.create(
                minBackoff, maxBackoff, 0)
                        .withMaxRestarts(maxRestart, minBackoff),
        () -> 
                // When the stream start it connect to kafka 
                Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
                    .map(mess -> 
                            //...do something
                            mess
                    )    
                    .watchTermination((control, done) -> {
                        done.whenComplete((d, e) -> {
                            // Disconnect from kafka when the stream is finished 
                            control.shutdown();
                        });
                        return control;
                    })
        );

Is there a way to do this easily with reactor and reactor kafka ?

ruslan
@unoexperto
Guys, what could be the reason for resourceCleanup callback of Flux.using to be executed on wrong thread ? It's not the thread that I specified in subscribeOn.
7 replies
ruslan
@unoexperto

Folks, is there a way to create Flux<String>.buffer() with custom condition such as "sum of lengths is greater than X OR item count equals Y" whichever comes first ? I feel like Flux.buffer(Publusher<>) is what I should use but I can't wrap my head around it.

Or something like stateful Flux.window() would do too.

bruto1
@bruto1
Looks like a custom window predicate can help
ruslan
@unoexperto
@bruto1 Are you referring to Flux.windowUntil(Predicate<T>) ? Is it ok that predicate has mutable state ?
bruto1
@bruto1
Yes, that one. Nothing wrong with mutable state as long as you take the necessary precautions (if your publisher is async)
Windowwhile probably matches your case better
Braffolk
@Braffolk

Hello, I'm using reactor with spring. I've run into a conundrum with trying to connect a rsocket connection to the output of another incoming connection from a 3rd service to the server, which sends data to the server which is associated with that specific rsocket connection. So a client connects to a streaming service and sends a job, which the streaming/API service then sends to a compute service to resolve. Previously this started a websocket connection between the streaming and compute and waited for the results to come in and then pushed the results back into the RSocket. The websocket protocol however has issues with streaming, so I have to replace that.

A solution im attempting is to create a new RSocket connection from the compute service to the streaming service. This would use @MessageMapping("compute-output-stream/{jobId}") path where it would send all the data. The issue is that this connection is started separately from the initial connection and unlike the scenario where the clients connection triggers a websocket between the compute and stream, there is no obvious way to connect the compute started rsocket back to the original requester. What could help is that both rsocket streams do know the jobId. The client that starts the initial connection does not.

So my question is whether there is a simple way to somehow connect the two streams. I've tried finding examples and look through documentation, but its hard to find anything that could help. Maybe I could somehow dynamically subscribe to a specific compute-output-stream/jobId, but I haven't found anything on it. Does anyone know any similar scenarios and common solutions or examples to this?

hmantri05
@hmantri05
Hi, I am trying to mock WebClient to write a unit test case for my class. I am using
webClient.post().uri("url").headers(<headers>).body(publisher, List.class).retrieve().bodyToMono(Response.class)
My mocking is successful till headers but it's giving me null when and I think the issue is with the ArgumentMatcher from Mockito. The full detail is given on this stackoverflow question.
https://stackoverflow.com/questions/66326800/what-is-the-argument-matcher-for-a-mono-type-parameter
Can someone help please?
4 replies
Ioannis Noukakis
@ioannisNoukakis
Hello guys. First of all, thanks for all you efforts making this incredible project! I'm looking for an operator similar to https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/statefulMapConcat.html. Any ideas?
hbutalia
@hbutalia

Getting below error when using - .parallel().runOn(Schedulers.boundedElastic())

io.netty.handler.ssl.SslHandshakeTimeoutException: handshake timed out after 10000ms
at io.netty.handler.ssl.SslHandler$5.run(SslHandler.java:2062)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoCreate] :
reactor.core.publisher.Mono.create
reactor.netty.resources.PooledConnectionProvider.acquire(PooledConnectionProvider.java:143)
Error has been observed at the following site(s):
| Mono.create Γçó at reactor.netty.resources.PooledConnectionProvider.acquire(PooledConnectionProvider.java:143)
|
Γçó at reactor.netty.tcp.TcpResources.acquire(TcpResources.java:213)
| Γçó at reactor.netty.tcp.TcpClientConnect.connect(TcpClientConnect.java:51)
|
Γçó at reactor.netty.tcp.TcpClientOperator.connect(TcpClientOperator.java:43)
| Mono.create Γçó at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.subscribe(HttpClientConnect.java:292)
|
Flux.concatMap Γçó at reactor.util.retry.RetrySpec.generateCompanion(RetrySpec.java:324)
|_ Mono.retryWhen Γçó at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.subscribe(HttpClientConnect.java:329)

1 reply
richard lee
@rockiee281
Hello guys! How can I do some special action with the last element in flux? i.e. I want add 1 to each element of flux while 5 to last element, [1, 2, 3, 4, 5] -> [2, 3, 4, 5, 10]
4 replies
James Bodkin
@JBodkin-LH
I've noticed that when I run a chain in parallel mode whilst using flatMap, it appears to produce values sequentially on different threads, why does that happen?
e.g.
customers
    .parallel(3)
    .runOn(Schedulers.boundedElastic())
    .flatMap(customer -> ...) // Creates a new flux that matches the customer to the another stream containing the updated details
    .map(this::updateCustomer)
    .map(this::addAddress)
Kamal Mahmud
@kamalhm

I've created this random dummy method to insert Member with n amount of times, but it seems to save the same member everytime

    return Mono.just(Member.builder()
            .name(RandomStringUtils.randomAlphabetic(5))
            .build())
            .flatMap(memberRepository::save)
            .repeat(number);

anyone know how to fix t?

1 reply
Gabriel Popovici
@popovici.gabriel_gitlab
hello; much appreciated if anyone can give me a hint on what might be the cause on a strange HTTP Status Code 504 i am receiving when consuming an endpoint inside an application build with Spring Boot parent 2.4.3 project reactor netty as server and spring data couchbase ; i have no clue as there are no logs when this is happening ;
i have activated project reactort tools agent and scheduler metrics ; inside the business logic are also WebClient Calls to external 3rd party services all happening on the main Event Loop (deafult loop)
image.png
only hint I see (no logs in INFO ) is this
perhaps anyone has an idea what to do under these circumstances ; much appreciated
Gabriel Popovici
@popovici.gabriel_gitlab
hello; what is the proper way to enable Access Logs with Netty ? I am using Netty with Spring Boot https://docs.spring.io/spring-boot/docs/2.4.3/reference/htmlsingle/#howto-configure-accesslogs but there is no official guide on how to set this up in version 2.4.3?
Moncef AOUDIA
@aoudiamoncef
@popovici.gabriel_gitlab: https://cloud.spring.io/spring-cloud-gateway/multi/multi__reactor_netty_access_logs.html, I tested it on my project, and it works as expected.
Gabriel Popovici
@popovici.gabriel_gitlab
thanks