Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
mplain
@mplain
I want to have two router endpoints, one POST that receives some data and sends it to a "channel", and the other GET that sends out everything from that channel as Server-Sent Events
I've seen such channels in Go
Is there something similar in Reactor?
3 replies
mplain
@mplain

I'm using Spring WebClient (based afaik on reactor-netty-httpclient?)
I'm sending a stream or 5 values with a one second delay (Flux<Object>, content-type = application/x-ndjson) to the server, and receiving the response as a Flux as well
I expect to see it the logs something like this:

object #1 send
response #1 received
object #2 send
response #2 received
...

Objects are sent one-by-one, and responses are also received one-by-one
but WebClient starts receiving the response only after it finishes to send the request
I've found a similar question on SO: https://stackoverflow.com/questions/52113235
In it, @bclozel says that Spring WebClient and Reactor Netty HttpClient only start processing the response after they're finished processing the request
Also, @violetagg told me that if the Netty server sends a response before it it finished processing the request body, then the remaining request body is discarded (?), as per the HTTP spec (?)
Could you please confirm that this is the way WebFlux and Reactor Netty work by design?

Interestingly, another answer on SO says that Jetty-based WebClient works differently, it is able to start receiving the response before finishing to process the request
Is the restriction on the client side, or server side, or both?

Violeta Georgieva
@violetagg

@mplain

Also, @violetagg told me that if the Netty server sends a response before it it finished processing the request body, then the remaining request body is discarded (?), as per the HTTP spec (?)

And this is true if you finalise the response and not if you start writing the response

For example this code will start echoing the received incoming data and will continue to do it untill the last message from the client is received
HttpServer server =
        HttpServer.create()
                  .port(PORT)
                  .route(r -> r.post("/echo",
                          (req, res) -> res.header(CONTENT_TYPE, TEXT_PLAIN)
                                           .send(req.receive().retain())));
Violeta Georgieva
@violetagg
And what @bclozel relpied in the mentioned SO question is so true - there is no guarantee that the sequance above that you expect will happen espetially nowadays with so many proxies and loadbalancers between the client and the server

then the remaining request body is discarded (?), as per the HTTP spec (?)

https://tools.ietf.org/html/rfc7230#section-6.3

A server MUST read
   the entire request message body or close the connection after sending
   its response, since otherwise the remaining data on a persistent
   connection would be misinterpreted as the next request.
mplain
@mplain
thank you!
I'm trying out reactor-kafka with blockhound
blockhound barks at KafkaConsumer.poll being a blocking method
I've found a github issue about it: reactor/reactor-kafka#63
should I move that flux to another scheduler to make the dog happy, or is there not point really?
11 replies
Knut Schleßelmann
@kschlesselmann
I have a publisher that only gets executed if I have acquired a Lock beforehand. After execution the Lock has to be released. Now doFinally sadly executes after the completion of the Publisher so there is no way (that I know of) that I can test this behavior. What would you suggest how I should handle this?
11 replies
Jeevjyot Singh Chhabda
@jeevjyot
Hi,
I have a List<String> and Mono<Map<String,Object>>. I want to look each entry of List<String> in Mono<Map<String,Object>> . What is the right way to achieve this in reactive programming?
3 replies
Rahul Narula
@ranarula

Hi, I have a handler method in which I need to persist data in DB and once persisted, I need to send out some events (http call reactive webclient) with the persisted data. I want to return the control back once the DB operation is completed. The sending of event parts need to happen as a background job. I am having issue assembling the Mono operator chain for such. Here is something what I have

Mono<ServerResponse> handleRequest(ServerRequest request) {
    var persistedMono = Mono.just("Hello")
        .flatMap(this::persistData)
        .doOnError(this::sendErrorEvent);

    persistedMono.publishOn(Schedulers.parallel())
        .doOnSuccess(this::sendEvents).subscribe();

    return persistedMono.flatMap(result -> ServerResponse.ok()
        .body(BodyInserters.fromValue(result)));
  }

The problem here is the subscription for persistMono happens twice and persistData is called twice.

How can I address this issue and send the events in the background.

Thanks in advance for any suggestions/solution

9 replies
rbrose
@rbrose
How i can solve this with only 1 SELECT Query?
    public Flux<TopicCommentResponse> topicComments(String language, long topicId) {
        return databaseClient.execute("" +
                "SELECT tc.*, u.username FROM topic_comment tc JOIN users u on tc.user_id = u.id WHERE tc.topic_id=:topicId AND tc.parent_id is null order by tc.created_at desc")
                .bind("topicId", topicId)
                .as(TopicCommentResponse.class)
                .fetch()
                .all()
                .flatMap(comment ->
                     Mono.just(comment)
                            .zipWith(databaseClient.execute("" +
                                    "SELECT tc.*, u.username FROM topic_comment tc JOIN users u on tc.user_id = u.id WHERE tc.topic_id=:topicId AND tc.parent_id=:parentId order by tc.created_at desc")
                                    .bind("topicId", topicId)
                                    .bind("parentId", comment.getId())
                                    .as(TopicCommentResponse.class)
                                    .fetch()
                                    .all().collectList())
                            .map(tupla ->  tupla.getT1().withAnswers(tupla.getT2()))
                );
    }
Dan Cohen-Smith
@dancohensmith
hey i see a new reactor kafka
does this mean the back pressure bug is fixed?
6 replies
does it actually respect backpressure now?
R2J
@rajeev-jha1988
did Spring R2DBC provide the support for jpaSpecification query
Mritunjay Dubey
@mddubey

Hello people!

I am trying to implement reactive rabbit-mq in my spring webflux application, however I am facing issues. Following the tutorial from reactor rabbitmq reference guide, this is what I am trying to do:

sender.send(Mono.just(message))
                    .doOnSuccess(a-> System.out.println("Delivered"))
                    .doOnError(e -> {
                        logger.error("error", e);
                    })
                    .subscribe()

Please let us know what might be going wrong! Have not been able to find a solution over internet

1 reply
Rob Gillen
@borgille
Hi everyone! I'm trying to wrap my head around something that is probably simple, but I haven't found a way to do it, and maybe there is a better way than I am thinking of. I would like to collect the values emitted by a Flux into a PriorityQueue to order them before emitting them to downstream subscribers. This seems like it could be achieved through a custom Processor, but there seems to be a consensus discouraging that. If not via custom Processor, then how?
12 replies
vireshwali
@vireshwali

Hi everyone. I am new to spring reactor and need some help implementing a simple use case. So i have two monos (mono1 and mono2) of same type Mono<List<Type1>> ....Type 1 is something like this

public class ErrorCountsByShortNameDto implements Serializable {
    private String errorShortName;
    private String deploymentName;
    private Long errorCount;
}

Now i need to loop thee list in the 2 monos and match the Type1 items whose errorShortName is same. Can anyone help me point in the right direction or any older message here that does this.

vireshwali
@vireshwali
any help for ^^ one?
Ismail Marmoush
@IsmailMarmoush
Hi, I'm using Mono.fromFuture(cf) How do I keep repeating this mono (something like .repeat()) but stop it as soon as cf isn't producing output. Hence you can consider cf a reactive api for a stream, but since it does that once I don't know how to keep polling data and stop when stream is finished.
4 replies
Ismail Marmoush
@IsmailMarmoush
@vireshwali I guess you'd zip the monos and map the tuple applying the matching function
1 reply
bruto1
@bruto1
a question for @bsideup , I guess? How do you serialize signal emissions with a Replay sink to avoid EmitResult.FAIL_NON_SERIALIZED?
19 replies
mplain
@mplain
I'm using Blockhound and kotlinx.coroutines-debug
Blockhound.install() throws a blocking call error for webClient.<...>.awaitBody()
Blockhound.builder().with(CoroutinesBlockHoundIntegration()).install() does not throw
could there be a conflict with some other basic BlockHoundintegration?
29 replies
dowgiallom
@dowgiallom

Hello All. Im trying to use the Sinks API to pass signals to consumers in certain way but looks like I'm doing something incorrectly. Could somebody help/take a look?

What I would like to do is:

sink = Sinks.many().replay().limit(EVENT_AGE_LIMIT);

and then expose it as:

sink.asFlux()

I wrote a simple test that ensures that a new subscriber still receives event that is less that EVENT_AGE_LIMIT old (lets say 2000ms):

        // arrange
        CountDownLatch received = new CountDownLatch(1);
        Sinks.Many<Object> sink = Sinks.many().replay().limit(Duration.ofMillis(2000));

        // act
        Sinks.EmitResult emittedResult = sink.tryEmitNext("event A");

        Thread.sleep(1800);

        sink.asFlux().publishOn(Schedulers.boundedElastic()).subscribe(e -> received.countDown());

        // verify
        assertThat(emittedResult.isSuccess()).isTrue();
        assertThat(received.await(200, TimeUnit.MILLISECONDS)).isTrue();

The test fails at the assertion in the last line. However, if I change the Sink to: Sinks.Many<Object> sink = Sinks.many().replay().all(); then it passes.
Does anybody know what I could be doing wrong?

16 replies
kaladhar
@kaladhar-mummadi
Hi,
Is there a way to skip stacktrace ( the 2nd part of log with all the reactor.core.publisher details ) from logs when using ReactorDebugAgent.init() ?
Thanks in advance
turbolag
@RahulKushwaha
Hi,
I am writing an application where I need to fetch a lock before running a bunch of operations. All of these are asynchronous operations and fully using reactive streams. I am trying to leverage doFirst to get a lock and doFinally to release the lock. Is this the right pattern or there is a more idiomatic way to do this?
Ismail Marmoush
@IsmailMarmoush

@RahulKushwaha what a strange coincidence, I'm doing exactly the same right now and was about to ask if it's alright or not
except that I'm not using doFirst because doFirst runs even before subscription which as far as I understood this would be right away, and that means your code would lock even before it's subscribed to, and I doubt you would want that.

Here's my code

@Override
  public <V> Mono<V> transaction(Id id, Callable<V> action) {
    return Mono.fromCallable(action).doOnSubscribe(s -> {
      locks.putIfAbsent(id, new ReentrantLock());
      locks.get(id).lock();
    }).doFinally(f -> locks.get(id).unlock());
  }

Hope one of the experts here would let us know if this is abuse of reactor or a totally fine thing.
P.S I'm locking on a certain id through a map.

1 reply
Ugnius Rumsevicius
@UgiR
wont you be blocking with locks?
Ismail Marmoush
@IsmailMarmoush
@UgiR I guess so, my only excuse was that this function is part of an "in memory" db with add() and get() operations and I needed a way to synchronize updates on a certain Id.
Do you think of a better way ?
Mister_Jesus
@Bittuw

Hello everybody, why this code doesn't work as expected?

@Test
@SneakyThrows
void simple() {
    final var emit = Sinks.many().multicast().onBackpressureBuffer();
    Flux.interval(Duration.ofSeconds(5), Schedulers.newSingle("special-timer")).log()
            .filterWhen(ll -> emit.asFlux().next().hasElement()).log()
            .subscribeOn(Schedulers.newSingle("under-test")).subscribe(ll -> System.out.println(ll));

    for (int i = 0; i <= 100; i += 1) {
        emit.tryEmitNext(0L);
        TimeUnit.SECONDS.sleep(10);
    }
}

I thought that each subscriber in filterWhen statement will wait for one element, but actually only the first subscription ends. All others still wait for one element...

1 reply
Prashant Pandey
@limafoxtrottango

I need some help understanding the threading behaviour of Mono.fromCallable(..). I have this code:

  public static void main(String[] args) throws InterruptedException {
    Flux.range(1, 100_000)
        .doOnNext(a -> System.out.println(a + ", thread: " + Thread.currentThread().getName()))
        .flatMap(a -> Mono.fromCallable(() -> blockingMethod(a)).subscribeOn(Schedulers.elastic()))
        .subscribe();
    System.out.println("Here");
    Thread.sleep(Integer.MAX_VALUE);
  }

  private static int blockingMethod(int s) {
    try {
      Thread.sleep(100_000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return s;
  }

What's happening is:

  1. The first 256 elements are printed on main (in doOnNext) as expected.

  2. After around 1 second, the next 256, then the next and so on. Elements from the second batch onwards are printed on elastic threads.

I would expect all the elements to be printed on the main thread. Why do I observe this behaviour? I have also asked this question here on Stack Overflow.

TIA

1 reply
Jorge F. Sánchez
@jfsanchez_gitlab
Hi community, does reactor's HttpClient support SSL client certificates?
Violeta Georgieva
@violetagg
@jfsanchez_gitlab If you configure the client’s SslContext like this isn’t it what you need
1 reply
SslContextBuilder.forClient().keyManager(clientCrtFile, clientKeyFile, clientKeyPassword)
supAllen
@supAllen
The maps are 15 tasks, but I use the following code, and the tasks executed are not necessarily 15 tasks. What is the reason?
private static void fluxTest2(List<Supplier<Map<String, Object>>> maps) {
        Map<String, Object> reduce = Flux.fromIterable(maps)
                .buffer(5)
                .parallel(maps.size() / 5)
                .runOn(scheduler)
                .map(entry -> {
                    long start = System.currentTimeMillis();
                    Map<String, Object> resMap = Maps.newHashMapWithExpectedSize(entry.size());
                    for (Supplier<Map<String, Object>> supplier : entry) {
                        Map<String, Object> realData = supplier.get();
                        resMap.putAll(realData);
                    }
                    System.out.println("step time: " + (System.currentTimeMillis() - start));
                    return resMap;
                })
                .reduce((m1, m2) -> {
                    m1.putAll(m2);
                    return m1;
                })
                .block();

        System.out.println(reduce);
        Map<String, Object> map = reduce;
        Integer t = map.values().stream()
                .map(v -> Integer.parseInt(String.valueOf(v)))
                .reduce(Integer::sum)
                .get();
        System.out.println("size: " + map.size() + "\ttime: " + t);
    }
iron2414
@iron2414
Hy there! I'm trying to create a big zip file (with Java.util.ZipOutputStream) and download it via Spring Webflux. But i'm having problems with the downloading part. First of all, should i return Flux or Mono? It's just one zip file, so Mono seems like a logical choice. But since its a big zip file, i'd like to send the data as soon as its ready, and not to build the whole zipfile in memory. By this logic Flux is the logical choice. Which one do i use? My second problem is , it downloads "response.html" instead of "result.zip" and the content is not good either. What am i missing? Any help is appreciated.
    @GetMapping(produces = "application/octet-stream")
    public Mono<ResponseEntity<FileSystemResource>> getZipeFile(
       ServerHttpRequest request
    ) {
        try {
            FileSystemResource zipFile = new FileSystemResource("result.zip");
            ZipOutputStream out = new ZipOutputStream(new FileOutputStream(zipFile.getFile()));
            out.putNextEntry(new ZipEntry("abc2.txt"));
            out.write("abc123".getBytes());
            out.flush();
            out.closeEntry();
            ResponseEntity<FileSystemResource> response = ResponseEntity
                .ok()
                .cacheControl(CacheControl.noCache())
                .contentType(MediaType.APPLICATION_OCTET_STREAM)
                .header("Content-Disposition", "attachment;filename=result.zip")
                .body(zipFile);
            return Mono.just(response);
        } catch (Exception e) {
            e.printStackTrace();
        }
        // just returning something so it compiles
        return Mono.just(
            ResponseEntity
                .status(400)
                .body(null));
    }
Rob Gillen
@borgille

Can anyone suggest a way to lazily subscribe to a Flux being used to source a unicast sink? I'd like to defer subscribing until after the Flux view of the sink (returned by the method) has been subscribed to:

    private Flux<Integer> getSinkFlux(Flux<Integer> srcFlux) {
        var dstSink = Sinks.many().unicast().<Integer>onBackpressureBuffer();
        var dstFlux = dstSink.asFlux()
            .log("reactor.Flux.DESTINATION", Level.WARNING);

        // todo: defer the following until subscriber on dstFlux
        srcFlux.subscribe(
            srcVal -> {
                dstSink.emitNext(srcVal, Sinks.EmitFailureHandler.FAIL_FAST);
            }
        );
        return dstFlux;
    }

I get a warning in IntelliJ regarding the srcFlux.subscribe in this context, which also makes me suspect I'm doing something incorrect. Thanks!

15 replies
Anna Blasingame
@amblas5687
Hi all,
Does anyone have experience with webflux + AOP? Have used AOP in the past for logging certain data and would like to couple it with a webflux application. Mainly needing to log all method arg data as well as return values.
Anna Blasingame
@amblas5687
Overall what I am trying to do is create a library that will use AOP to log the method args and return values in an automatic fashion. Then this library can be reused by multiple applications, allowing standardized logging. However, I am at a bit of a loss with how to go about accessing the method args and responses without breaking the reactive chain. We need a utility logger because this will need to be done in many microservices. Do you guys have any suggestions about how to go about logging in webflux on an enterprise system?
Scott White
@kibbled
does anyone have a recent example of using CacheMono?
Sagar Raj
@SagarrajRaj_twitter
Hello,
Any really good and efficient implementation of SQS poller using reactor?
Would really appreciate any references
skis
@skis
Hey Guys, I'm looking for some suggestions... I am working with a legacy monolith application. It has a built-in batch. The application is being modified to support multi-cluster architecture and the batch is exposed to external consumption/invocation as a web service. However, the batch cannot run on all the clusters at the same time or in parallel. I'm thinking that perhaps I could use reactor approach to run the batch sequentially on all the servers(one after the other). Does this approach make sense? Note: The monolith cannot be broken down to microservices. Let me know your thoughts.
peterluttrell
@peterluttrell_twitter
Teodor Stanca
@TeodorST_gitlab

Hi all,

I am trying to read messages from webflux server but I can't.
This is my code:

@Test
public void testFindPersonsJson() throws Throwable {
final WebClient client = WebClient
.builder()
.baseUrl("ws://baseUrl_blabla:8080")
.defaultHeaders(httpHeaders -> {
httpHeaders.set("Sec-WebSocket-Key", "xqBt3ImNzJbYqRINxEFlkg==");
httpHeaders.set("Sec-WebSocket-Protocol", "chat, superchat");
httpHeaders.set("Sec-WebSocket-Version", "13");
httpHeaders.set("Sec-WebSocket-Extensions", "permessage-deflate");
httpHeaders.setUpgrade("websocket");
httpHeaders.setConnection("Upgrade");
httpHeaders.setContentType(MediaType.APPLICATION_STREAM_JSON);
}).build();

    final Waiter waiter = new Waiter();
    Flux<String> persons = client.
            get()
            .uri("/ws/persons?authorization=fsdadf_4235fefds-34fdfd")
            .retrieve()
            .bodyToFlux(String.class);
    persons.subscribe(person -> {
        waiter.assertNotNull(person);
        System.out.println("_____________Client subscribes: {}" + person);
        waiter.resume();
    });
    waiter.await(10000, 9);
}

I don't know how to get the response messages. It shoud look like that:
{"messageId":"43567gdjjsdf","messageType":"ABC","messagePayload":{"registered":true,"blocked":false}}

I can see my message on "16:11:04.304 [reactor-http-nio-2] DEBUG reactor.netty.channel.ChannelOperationsHandler - [___ id, L, R] No ChannelOperation attached. Dropping: +-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 81 7e 00 99 7b 22 6d 65 73 73 61 67 65 49 64 22 |.~..{"messageId"|
|00000010| 3a 22 33 33 37 31 36 64 61 35 62 61 62 65 61 39 |:"33716da5babea9|" but I can get it to hendle.
...................
but I can get it to hendle.

Seems that reactor.netty.channel.ChannelOperationsHandler logs my message but how can I get them?

Can someone please help my with that?
Thank you.

https://gitter.im/reactor/reactor?at=5d8d04c4290b8c354addc278

5 replies
Ken Kousen
@kousen

I was reading through the Flight of the Flux 3 blog post, and I saw that the recommended strategy for wrapping a blocking call is:

final Flux<String> betterFetchUrls(List<String> urls) {
  return Flux.fromIterable(urls)
    .flatMap(url -> 
             //wrap the blocking call in a Mono
             Mono.fromCallable(() -> blockingWebClient.get(url))
             //ensure that Mono is subscribed in an boundedElastic Worker
             .subscribeOn(Schedulers.boundedElastic())
    ); //each individual URL fetch runs in its own thread!
}

But in the same article they used publishOn instead:

Flux.fromIterable(firstListOfUrls) //contains A, B and C
    .publishOn(Schedulers.boundedElastic())
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));

Based on that, wouldn't it be simpler to implement the betterFetchUrls method using publishOn, i.e.,

final Flux<String> betterFetchUrls(List<String> urls) {
    return Flux.fromIterable(urls)
        .publishOn(Schedulers.boundedElastic())
        .map(url -> blockingWebClient.get(url));
}

Am I missing something? The Reactor user manual in Appendix C also uses subscribeOn with a Mono, so I presume there's some advantage I'm missing.

Any insight would be appreciated.

4 replies
Steve Storck
@Steve973
Hello. I posted something on stackoverflow about managing multiple Flux instances so that I can wait until they are all complete before processing each of them. Would it be appropriate to link to my SO question? Or should I describe the details here?
10 replies