All generic questions around Reactor. For advanced questions you can also try #reactor-core and #reactor-netty
I see this has been asked before, however I did not find an answer to the question:
Is there specific reason for the move of AssertSubscriber
(and earlier TestSubscriber
) to the test source-set of reactor-core
? I think it would be great to have its functionality publicly exposed via reactor-test
?
Sinks.EmitResult.FAIL_OVERFLOW
- how do I deal with this?
Flux.range(1, 11)
.publish(f -> f.filter(i -> i % 2 == 0).log("even").map(i -> 2).switchIfEmpty(f))
.publish(f -> f.filter(i -> i % 2 != 0).log("odd").map(i -> 3))
.log("output")
.subscribe();
Hooks.KEY_ON_*
constants not public?Hello, I have a question about the expected behaviour of the .publish
and error signals. Testing the operator on Flux
and Mono
is giving me different results in that specific case.
Testing
StepVerifier.create(Mono.<Integer>error(new RuntimeException("Error")).publish(x -> x))
.expectErrorMatches(throwable -> throwable.getMessage().equals("Error"))
.verify();
fails while the same for Flux passes, I'm wondering which would be the expected behaviour?
Hi!
I need to execute blocking call by JdbcTemplate in my reactive program. I found in official docs that the best way to do it use a Schedulers.boundedElastic(). But, when I tried to execute it by under Schedulers.parallel(), I got the same non blocking result. Which of Schedulers.parallel() and Schedulers.boundedElastic() is perferable to use?
Hi I am new to webflux/reactive programming. I have a query , I am trying to add a filter and once the filter condition is success, I will calling the request handler method using "next.handle(request)"
i.e below is my RouterFounction:
public RouterFunction<ServerResponse> sampleRouter() {
route(POST(PATH).and(accept(APPLICATION_JSON)), requestHandler::login).filter(new FilterHandler());
}
And inside the filter() method I am checking the condition and if the condition is true then it should call the requestHandler function "login()""
public Mono<ServerResponse> login(final ServerRequest serverRequest) {
final String serviceName = "login";
LOGGER.info("Service Method Invoked : {}", serviceName);
return (Mono<ServerResponse>) serverRequest.bodyToMono(Request.class)
.flatMap(request -> serviceCaller.callPost(serverRequest, request, Object.class, serviceName))
.flatMap(response -> {
return ServerResponse.ok().body(fromObject(response));
}).onErrorResume(error -> errorHandler.throwableError(error));
}
FilterHandler.java overidden filter method is as below:
@Override
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
Mono<Request> requestMono = request.bodyToMono(Request.class);
Mono<Data> dataMono = data.retrieveData(request, Data.class);
return Mono.zip(requestMono, dataMono).flatMap(combinedData -> {
if (combinedData.getT1().getId.equals(combinedData.getT2().getId())) {
return next.handle(request); // this will be calling the request handler method login()
} else {
return ServerResponse.status(403).build();
}
});
}
But in this case the login method will not return the "response" ,as its returns before " .flatMap(response -> {
return ServerResponse.ok().body(fromObject(response));" of the login method.
Is there any appropriate way in reactive programming to return the response from login method when it is invoked within the flatMap of Mono.zip(requestMono, dataMono)?
Hi there,
How is it possible
Mono.just(someValue)
.doOnNext(err -> log.error("on next..."))
.flatMap(client::execute)
.doOnError(err -> log.error("on err... {}", err.hashCode()))
and I see such logs:
18:04:00.023Z ERROR [boundedElastic-3] - on next...
18:04:00.826Z ERROR [reactor-http-kqueue-12] - on err... 1282189168
18:04:00.826Z ERROR [reactor-http-kqueue-11] - on err... 1561963113
If it's Mono
then I should receive a single value or a single error. But I see two errors.
Hello, I have simple logic to register user if it not exists. If exists - error should be thrown. Unfortunately, switchIfEmpty
(when user not found) is not being called.
I'm using DynamoDB async client (it uses CompletableFuture)
My handler:
public Mono<ServerResponse> register(ServerRequest serverRequest) {
return serverRequest.bodyToMono(UserRegisterRequest.class)
.flatMap(request -> userRepository.getByPhone(request.getPhone()))
.flatMap(existing -> Mono.error(new UserAlreadyExistsException(existing.getPhone())))
.switchIfEmpty(serverRequest.bodyToMono(UserRegisterRequest.class))
.cast(UserRegisterRequest.class)
.flatMap(request -> Mono.just(User.register(request.getPhone(), request.getCountry())))
.flatMap(user -> userRepository.save(user).zipWith(Mono.just(user)))
.flatMap(tuple -> Mono.just(tuple.getT2()))
.flatMap(user -> ServerResponse.created(UriComponentsBuilder.newInstance()
.path("/users/{phone}")
.buildAndExpand(user.getPhone())
.toUri()).build());
}
My repository:
public Mono<User> getByPhone(String phone) {
var key = Key.builder().partitionValue(phone).build();
return Mono.fromFuture(users.getItem(key)).map(UserMapper::map);
}
I can't find issue. Could somebody help me with that?
sink.emitNext
) to block/retry if the previous item is not fully processed yet, waiting for the request count to become > 0 or for the subscription to be disposed. I don't see a clear way to do this though, even when I implement a CoreSubscriber
, doing subscriber.request(1)
- the publisher seems to ignore this. It seems that the default request size is 256. Is there a way to control that? I'm using Sinks.many().unicast().onBackpressureBuffer(myQueue)
for this.
@Test
fun stringBuilderWithWebFLux() {
val builder = StringBuilder()
builder.appendln("Initial")
val list = listOf(1,2,3)
val res = Flux.fromIterable(list)
.flatMap {
apiCall(it).map { builder.appendln(it) }
}.then(builder.toString().toMono())
StepVerifier.create(res)
.consumeNextWith {
print(it) //returns Intial instead of Initial \n Value 1 \n Value 2 \n Value 3
}.verifyComplete()
}
private fun apiCall(it: Int?): Mono<String> {
return "Value $it".toMono()
}
Hello,
I am seeing a weird behavior, I am calling a mono inside doOnSuccess & it seems to be automatically getting subscribed. Is this a bug?
public class DoOnSuccessTest {
public static void main(String[] args) {
Flux.range(1, 2)
.flatMap(i->monoFromCallable(i).doOnSuccess(DoOnSuccessTest::monoFromCallable))
.subscribe(System.out::println);
try {
Thread.sleep(5000);
} catch (Exception e) {
}
}
public static Flux<Integer> testFlux() {
return Flux.range(1,10);
}
public static Mono<Integer> monoFromCallable(Integer i) {
return Mono.fromFuture(completableFuture()).log();
}
public static CompletableFuture<Integer> completableFuture() {
Integer val = ThreadLocalRandom.current().nextInt();
System.out.println("calling thread local random "+val);
return CompletableFuture.completedFuture(val);
}
}
verify()
fail to return after a thenCancel()
?import org.junit.jupiter.api.Test
import reactor.core.publisher.Flux
import reactor.kotlin.test.test
class FluxTest {
@Test
fun `test the things`() {
val messageProcessor = MessageProcessor<String>()
val flux = Flux
.create<String> { sink ->
messageProcessor.register { message -> sink.next(message) }
}
.share()
messageProcessor.send("1")
flux
.test()
.expectNextMatches { it == "1" }
.thenCancel()
.verify()
}
}
class MessageProcessor<T> {
private val listeners = mutableListOf<(T) -> Unit>()
fun register(listener: (T) -> Unit) = listeners.add(listener)
fun send(message: T) = listeners.forEach { it.invoke(message) }
}
Hello
I try to move code from Stream to Flux
I notice with ParallelStream, map functions is do in different threads
But not with Flux
I try this
Flux.just(httpClient1, httpClient2)
.publishOn(Schedulers.parallel())
.subscribeOn(Schedulers.parallel())
.map { http ->
val sw = StopWatch.createStarted()
log.info("Sending request with ${http.name}")
val res = web3j.ethSendRawTransaction(transaction.raw).send()
log.info("Response with ${http.name} after ${sw.getTime(TimeUnit.MILLISECONDS)} ms")
res
}
I see in logs :
16:04:33,389 [parallel-2] Sending request with httpClient1
16:04:33,487 [parallel-2] Response with httpClient1 after 97 ms
16:04:33,487 [parallel-2] Sending request with httpClient2
16:04:33,657 [parallel-2] Response with httpClient2 after 170 ms
So Flux is waiting the first request before execut the second
How to switch to real parallel code ?
Hi all,
I have question regarding Paging on Couchbase DB with rector core. I am trying to get all userIds from database /around 300 000/. Query is to slow and I believe paging would help a lot. I saw in docs suggestion to use Pageable
but it seems it dose not work. Dose anyone have any idea how to achieve this?
public interface UserRepository extends ReactiveCouchbaseRepository<UserDoc, String>
{
@Query("SELECT user.id.`value` as id "
+ "FROM #{#n1ql.bucket} "
+ "WHERE #{#n1ql.filter}")
Flux<StringValue> findAllUserIds();
}
Hi,
I’m using reactor kafka and I would like to handle failures by deconnecting from kafka and reconnecting with exponential backoff.
For exemple, if I need to store messages in a DB and the DB is not available, I would like to let the stream crash, disconnect from kafka without commiting and reconnect after a delay rereading the non commited message and reprocess them.
I used to do it with akka stream this way :
RestartSource
// Restart source will restart the stream if it fail
.onFailuresWithBackoff(RestartSettings.create(
minBackoff, maxBackoff, 0)
.withMaxRestarts(maxRestart, minBackoff),
() ->
// When the stream start it connect to kafka
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.map(mess ->
//...do something
mess
)
.watchTermination((control, done) -> {
done.whenComplete((d, e) -> {
// Disconnect from kafka when the stream is finished
control.shutdown();
});
return control;
})
);
Is there a way to do this easily with reactor and reactor kafka ?
Folks, is there a way to create Flux<String>.buffer()
with custom condition such as "sum of lengths is greater than X OR item count equals Y" whichever comes first ? I feel like Flux.buffer(Publusher<>)
is what I should use but I can't wrap my head around it.
Or something like stateful Flux.window()
would do too.
Hello, I'm using reactor with spring. I've run into a conundrum with trying to connect a rsocket connection to the output of another incoming connection from a 3rd service to the server, which sends data to the server which is associated with that specific rsocket connection. So a client connects to a streaming service and sends a job, which the streaming/API service then sends to a compute service to resolve. Previously this started a websocket connection between the streaming and compute and waited for the results to come in and then pushed the results back into the RSocket. The websocket protocol however has issues with streaming, so I have to replace that.
A solution im attempting is to create a new RSocket connection from the compute service to the streaming service. This would use @MessageMapping("compute-output-stream/{jobId}")
path where it would send all the data. The issue is that this connection is started separately from the initial connection and unlike the scenario where the clients connection triggers a websocket between the compute and stream, there is no obvious way to connect the compute started rsocket back to the original requester. What could help is that both rsocket streams do know the jobId. The client that starts the initial connection does not.
So my question is whether there is a simple way to somehow connect the two streams. I've tried finding examples and look through documentation, but its hard to find anything that could help. Maybe I could somehow dynamically subscribe to a specific compute-output-stream/jobId, but I haven't found anything on it. Does anyone know any similar scenarios and common solutions or examples to this?
webClient.post().uri("url").headers(<headers>).body(publisher, List.class).retrieve().bodyToMono(Response.class)
headers
but it's giving me null when and I think the issue is with the ArgumentMatcher
from Mockito
. The full detail is given on this stackoverflow question.Getting below error when using - .parallel().runOn(Schedulers.boundedElastic())
io.netty.handler.ssl.SslHandshakeTimeoutException: handshake timed out after 10000ms
at io.netty.handler.ssl.SslHandler$5.run(SslHandler.java:2062)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoCreate] :
reactor.core.publisher.Mono.create
reactor.netty.resources.PooledConnectionProvider.acquire(PooledConnectionProvider.java:143)
Error has been observed at the following site(s):
| Mono.create Γçó at reactor.netty.resources.PooledConnectionProvider.acquire(PooledConnectionProvider.java:143)
| Γçó at reactor.netty.tcp.TcpResources.acquire(TcpResources.java:213)
| Γçó at reactor.netty.tcp.TcpClientConnect.connect(TcpClientConnect.java:51)
| Γçó at reactor.netty.tcp.TcpClientOperator.connect(TcpClientOperator.java:43)
| Mono.create Γçó at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.subscribe(HttpClientConnect.java:292)
| Flux.concatMap Γçó at reactor.util.retry.RetrySpec.generateCompanion(RetrySpec.java:324)
|_ Mono.retryWhen Γçó at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.subscribe(HttpClientConnect.java:329)
customers
.parallel(3)
.runOn(Schedulers.boundedElastic())
.flatMap(customer -> ...) // Creates a new flux that matches the customer to the another stream containing the updated details
.map(this::updateCustomer)
.map(this::addAddress)
I've created this random dummy method to insert Member with n amount of times, but it seems to save the same member everytime
return Mono.just(Member.builder()
.name(RandomStringUtils.randomAlphabetic(5))
.build())
.flatMap(memberRepository::save)
.repeat(number);
anyone know how to fix t?