Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
mplain
@mplain
Why this difference in behavior?
if the server returns a flux of strings, if I manually set content-type to application/json, it makes no difference as the client still receives response first, the body 5 seconds after, as a concatenated string
mplain
@mplain
what do i need to do to make it return an pretty array of strings ?
1 reply
mplain
@mplain
and what's the principal difference between returning a Flux<String> and a Mono<List<String>>?
(provided I don't intend to use content-type application/stream+json or text/event-stream)
2 replies
iron2414
@iron2414

Hi there, i'm trying to implement a customQueue for a UnicastProcessor inside a Spring application, to handle bursts (i just want to drop the incoming message, if there is too many). My problem is, if i do

UnicastProcessor<WebSocketMessage> publisher = UnicastProcessor.create(myCustomqueue);

then my application totaly freezes overtime, and NOTHING works. there are no more logs, or anything like that. However if i create a UnicastProcessor with 0 parameter in the constructor, then everything is working fine.

Here's my custom queue implementation, it's really simple:

public class LimitedQueue<E> extends ArrayDeque<E> {

    private final int limit;

    public LimitedQueue(int limit) {

        this.limit = limit;

    }

    @Override
    public boolean offer(E o) {
        if (size() >= limit) {
            return false;
        }
        return super.offer(o);
    }

}

I found a really similiar question from 8 months ago here as well:
https://stackoverflow.com/questions/60193240/reactor-unicastprocessor-drop-on-backpressure-when-queue-is-full

Could anyone help me out?

1 reply
Fırat Sivrikaya
@firatsivrikaya
Hey everyone, I have just started to learn spring reactor, been reading the reference document but I am having hard time understanding the concepts and finding the reference a little bit complex for a beginner like me. Also I couldnt find real world examples on the internet, can you please suggest me some resources for better understanding reactor and webflux? All examples that I see on the internet are mostly contains creating a flux and then subscribing to it. Maybe someone can share open source real world projects with tests and I could dive into them. I am losing a lot of motivation here :)
Violeta Georgieva
@violetagg
@firatsivrikaya What exactly do you need. There are a lot of resource in the web. Josh Long has a lot of videos that you can check. For example https://youtu.be/_LR0Cxnn-kw?t=1649
Mikael Elm
@mickeelm
Also, if you haven't had that approach already - read the reference patiently. Don't skip parts or rush through it. IMO then it is a great resource. But I agree, there are many good videos out there from conference talks where introductions to webflux/reactor are covered
iron2414
@iron2414

Hi everyone, I'm trying reactor Kafka (I'm basicaly a newbie at it), and made the samples work. However the consumer keeps spamming the following logs:

2020-11-04 10:01:36.007+0100 DEBUG 3856 --- [-sample-group-1] o.a.k.c.FetchSessionHandler              : [Consumer clientId=sample-consumer, groupId=sample-group] Node 0 sent an incremental fetch response for session 569816377 with 0 response partition(s), 1 implied partition(s)
2020-11-04 10:01:36.008+0100 DEBUG 3856 --- [-sample-group-1] o.a.k.c.c.i.Fetcher                      : [Consumer clientId=sample-consumer, groupId=sample-group] Added READ_UNCOMMITTED fetch request for partition test2-0 at position FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=abbelfynsrv:9092 (id: 0 rack: null), epoch=0}} to node abbelfynsrv:9092 (id: 0 rack: null)
2020-11-04 10:01:36.008+0100 DEBUG 3856 --- [-sample-group-1] o.a.k.c.FetchSessionHandler              : [Consumer clientId=sample-consumer, groupId=sample-group] Built incremental fetch (sessionId=569816377, epoch=35) for node 0. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
2020-11-04 10:01:36.008+0100 DEBUG 3856 --- [-sample-group-1] o.a.k.c.c.i.Fetcher                      : [Consumer clientId=sample-consumer, groupId=sample-group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(test2-0)) to broker abbelfynsrv:9092 (id: 0 rack: null)

Is there a way to disable this? I can show my consumer's code if needed.

Jens Geiregat
@jgrgt

Hi all,

I'm developing a Spring-Boot Reactor based application that we're currently performance testing. The basic data flow is:

  1. Fetch a lot of data from the DB using r2dbc-postgresql
  2. Do some heavy calculations on that data
  3. Return an aggregation of that data

Now, we're having issues. When the system is under load, our DB reports responding withing < 200ms is most cases, while a lot of our trace data shows that we only receive the first result from the DB in the code after ~1s. And I have no clue where that time difference is coming from.

Some things I have tried:

  • moving the cpu heavy calculations to the Parallel scheduler
  • even added artificial buffering to the stream to be able to measure throughput, or avoid backpressure problems, but still, the DB code seems extremely slow.

The only thing left for me to try is to throw out the r2dbc-postgresql repository and replace it with plain old jdbc and just schedule that on a boundedElastic scheduler?

Any idea is welcome...

ChandanPadalkar
@ChandanPadalkar
Hi Guys,
I am developing application using Spring webflux and cassandra as database. I have use case where I want to sort data based on particular field, fetch top 10 records and display on UI. Due to cassandra limitations I cannot do directly through database query.
I have tried to fetch all data from table and sort data on Java layer and then return top 10 records to UI. Though I have millions of records in table, fetching all data and do sorting on Java layer takes significant amount of time.
Is there any better approach to achieve the same on Java layer?
8 replies
Knut Schleßelmann
@kschlesselmann

I've got a library which parses a lot of XML data and where I have to provide some callback what should be done with the parsed data. Now I try to incorporate this lib in my reactive application like this

private fun InputStream.parse(): Flux<ParsedProduct> = Flux.create { sink ->

    val requested = AtomicLong(0)

    sink.onRequest {
        if (logger.isTraceEnabled) {
            logger.trace("Requested ParsedProducts", kv("number", it))
        }

        requested.addAndGet(it)
    }

    val parser = XmlArchiveParser(ProductParser()) {
        if (logger.isTraceEnabled) {
            logger.trace("Parsed product", kv("productId", it.productId))
        }

        while (requested.get() <= 0) {
            logger.trace { "Waiting for requests" }
            Thread.sleep(5)
        }

        sink.next(it)
        requested.decrementAndGet()
    }

    parser.importAndProcessProducts(this)
    sink.complete()
}

Is this a proper way to implement this hybrid push/pull backpressure mechanism? If so which Thread gets blocked currently by the Thread.sleep? Should I subscribe this stuff on boundedElastic()?

mplain
@mplain
I want to have two router endpoints, one POST that receives some data and sends it to a "channel", and the other GET that sends out everything from that channel as Server-Sent Events
I've seen such channels in Go
Is there something similar in Reactor?
3 replies
mplain
@mplain

I'm using Spring WebClient (based afaik on reactor-netty-httpclient?)
I'm sending a stream or 5 values with a one second delay (Flux<Object>, content-type = application/x-ndjson) to the server, and receiving the response as a Flux as well
I expect to see it the logs something like this:

object #1 send
response #1 received
object #2 send
response #2 received
...

Objects are sent one-by-one, and responses are also received one-by-one
but WebClient starts receiving the response only after it finishes to send the request
I've found a similar question on SO: https://stackoverflow.com/questions/52113235
In it, @bclozel says that Spring WebClient and Reactor Netty HttpClient only start processing the response after they're finished processing the request
Also, @violetagg told me that if the Netty server sends a response before it it finished processing the request body, then the remaining request body is discarded (?), as per the HTTP spec (?)
Could you please confirm that this is the way WebFlux and Reactor Netty work by design?

Interestingly, another answer on SO says that Jetty-based WebClient works differently, it is able to start receiving the response before finishing to process the request
Is the restriction on the client side, or server side, or both?

Violeta Georgieva
@violetagg

@mplain

Also, @violetagg told me that if the Netty server sends a response before it it finished processing the request body, then the remaining request body is discarded (?), as per the HTTP spec (?)

And this is true if you finalise the response and not if you start writing the response

For example this code will start echoing the received incoming data and will continue to do it untill the last message from the client is received
HttpServer server =
        HttpServer.create()
                  .port(PORT)
                  .route(r -> r.post("/echo",
                          (req, res) -> res.header(CONTENT_TYPE, TEXT_PLAIN)
                                           .send(req.receive().retain())));
Violeta Georgieva
@violetagg
And what @bclozel relpied in the mentioned SO question is so true - there is no guarantee that the sequance above that you expect will happen espetially nowadays with so many proxies and loadbalancers between the client and the server

then the remaining request body is discarded (?), as per the HTTP spec (?)

https://tools.ietf.org/html/rfc7230#section-6.3

A server MUST read
   the entire request message body or close the connection after sending
   its response, since otherwise the remaining data on a persistent
   connection would be misinterpreted as the next request.
mplain
@mplain
thank you!
I'm trying out reactor-kafka with blockhound
blockhound barks at KafkaConsumer.poll being a blocking method
I've found a github issue about it: reactor/reactor-kafka#63
should I move that flux to another scheduler to make the dog happy, or is there not point really?
11 replies
Knut Schleßelmann
@kschlesselmann
I have a publisher that only gets executed if I have acquired a Lock beforehand. After execution the Lock has to be released. Now doFinally sadly executes after the completion of the Publisher so there is no way (that I know of) that I can test this behavior. What would you suggest how I should handle this?
11 replies
Jeevjyot Singh Chhabda
@jeevjyot
Hi,
I have a List<String> and Mono<Map<String,Object>>. I want to look each entry of List<String> in Mono<Map<String,Object>> . What is the right way to achieve this in reactive programming?
3 replies
Rahul Narula
@ranarula

Hi, I have a handler method in which I need to persist data in DB and once persisted, I need to send out some events (http call reactive webclient) with the persisted data. I want to return the control back once the DB operation is completed. The sending of event parts need to happen as a background job. I am having issue assembling the Mono operator chain for such. Here is something what I have

Mono<ServerResponse> handleRequest(ServerRequest request) {
    var persistedMono = Mono.just("Hello")
        .flatMap(this::persistData)
        .doOnError(this::sendErrorEvent);

    persistedMono.publishOn(Schedulers.parallel())
        .doOnSuccess(this::sendEvents).subscribe();

    return persistedMono.flatMap(result -> ServerResponse.ok()
        .body(BodyInserters.fromValue(result)));
  }

The problem here is the subscription for persistMono happens twice and persistData is called twice.

How can I address this issue and send the events in the background.

Thanks in advance for any suggestions/solution

9 replies
rbrose
@rbrose
How i can solve this with only 1 SELECT Query?
    public Flux<TopicCommentResponse> topicComments(String language, long topicId) {
        return databaseClient.execute("" +
                "SELECT tc.*, u.username FROM topic_comment tc JOIN users u on tc.user_id = u.id WHERE tc.topic_id=:topicId AND tc.parent_id is null order by tc.created_at desc")
                .bind("topicId", topicId)
                .as(TopicCommentResponse.class)
                .fetch()
                .all()
                .flatMap(comment ->
                     Mono.just(comment)
                            .zipWith(databaseClient.execute("" +
                                    "SELECT tc.*, u.username FROM topic_comment tc JOIN users u on tc.user_id = u.id WHERE tc.topic_id=:topicId AND tc.parent_id=:parentId order by tc.created_at desc")
                                    .bind("topicId", topicId)
                                    .bind("parentId", comment.getId())
                                    .as(TopicCommentResponse.class)
                                    .fetch()
                                    .all().collectList())
                            .map(tupla ->  tupla.getT1().withAnswers(tupla.getT2()))
                );
    }
Dan Cohen-Smith
@dancohensmith
hey i see a new reactor kafka
does this mean the back pressure bug is fixed?
6 replies
does it actually respect backpressure now?
R2J
@rajeev-jha1988
did Spring R2DBC provide the support for jpaSpecification query
Mritunjay Dubey
@mddubey

Hello people!

I am trying to implement reactive rabbit-mq in my spring webflux application, however I am facing issues. Following the tutorial from reactor rabbitmq reference guide, this is what I am trying to do:

sender.send(Mono.just(message))
                    .doOnSuccess(a-> System.out.println("Delivered"))
                    .doOnError(e -> {
                        logger.error("error", e);
                    })
                    .subscribe()

Please let us know what might be going wrong! Have not been able to find a solution over internet

1 reply
Rob Gillen
@borgille
Hi everyone! I'm trying to wrap my head around something that is probably simple, but I haven't found a way to do it, and maybe there is a better way than I am thinking of. I would like to collect the values emitted by a Flux into a PriorityQueue to order them before emitting them to downstream subscribers. This seems like it could be achieved through a custom Processor, but there seems to be a consensus discouraging that. If not via custom Processor, then how?
12 replies
vireshwali
@vireshwali

Hi everyone. I am new to spring reactor and need some help implementing a simple use case. So i have two monos (mono1 and mono2) of same type Mono<List<Type1>> ....Type 1 is something like this

public class ErrorCountsByShortNameDto implements Serializable {
    private String errorShortName;
    private String deploymentName;
    private Long errorCount;
}

Now i need to loop thee list in the 2 monos and match the Type1 items whose errorShortName is same. Can anyone help me point in the right direction or any older message here that does this.

vireshwali
@vireshwali
any help for ^^ one?
Ismail Marmoush
@IsmailMarmoush
Hi, I'm using Mono.fromFuture(cf) How do I keep repeating this mono (something like .repeat()) but stop it as soon as cf isn't producing output. Hence you can consider cf a reactive api for a stream, but since it does that once I don't know how to keep polling data and stop when stream is finished.
4 replies
Ismail Marmoush
@IsmailMarmoush
@vireshwali I guess you'd zip the monos and map the tuple applying the matching function
1 reply
bruto1
@bruto1
a question for @bsideup , I guess? How do you serialize signal emissions with a Replay sink to avoid EmitResult.FAIL_NON_SERIALIZED?
19 replies
mplain
@mplain
I'm using Blockhound and kotlinx.coroutines-debug
Blockhound.install() throws a blocking call error for webClient.<...>.awaitBody()
Blockhound.builder().with(CoroutinesBlockHoundIntegration()).install() does not throw
could there be a conflict with some other basic BlockHoundintegration?
29 replies
dowgiallom
@dowgiallom

Hello All. Im trying to use the Sinks API to pass signals to consumers in certain way but looks like I'm doing something incorrectly. Could somebody help/take a look?

What I would like to do is:

sink = Sinks.many().replay().limit(EVENT_AGE_LIMIT);

and then expose it as:

sink.asFlux()

I wrote a simple test that ensures that a new subscriber still receives event that is less that EVENT_AGE_LIMIT old (lets say 2000ms):

        // arrange
        CountDownLatch received = new CountDownLatch(1);
        Sinks.Many<Object> sink = Sinks.many().replay().limit(Duration.ofMillis(2000));

        // act
        Sinks.EmitResult emittedResult = sink.tryEmitNext("event A");

        Thread.sleep(1800);

        sink.asFlux().publishOn(Schedulers.boundedElastic()).subscribe(e -> received.countDown());

        // verify
        assertThat(emittedResult.isSuccess()).isTrue();
        assertThat(received.await(200, TimeUnit.MILLISECONDS)).isTrue();

The test fails at the assertion in the last line. However, if I change the Sink to: Sinks.Many<Object> sink = Sinks.many().replay().all(); then it passes.
Does anybody know what I could be doing wrong?

16 replies
kaladhar
@kaladhar-mummadi
Hi,
Is there a way to skip stacktrace ( the 2nd part of log with all the reactor.core.publisher details ) from logs when using ReactorDebugAgent.init() ?
Thanks in advance
turbolag
@RahulKushwaha
Hi,
I am writing an application where I need to fetch a lock before running a bunch of operations. All of these are asynchronous operations and fully using reactive streams. I am trying to leverage doFirst to get a lock and doFinally to release the lock. Is this the right pattern or there is a more idiomatic way to do this?
Ismail Marmoush
@IsmailMarmoush

@RahulKushwaha what a strange coincidence, I'm doing exactly the same right now and was about to ask if it's alright or not
except that I'm not using doFirst because doFirst runs even before subscription which as far as I understood this would be right away, and that means your code would lock even before it's subscribed to, and I doubt you would want that.

Here's my code

@Override
  public <V> Mono<V> transaction(Id id, Callable<V> action) {
    return Mono.fromCallable(action).doOnSubscribe(s -> {
      locks.putIfAbsent(id, new ReentrantLock());
      locks.get(id).lock();
    }).doFinally(f -> locks.get(id).unlock());
  }

Hope one of the experts here would let us know if this is abuse of reactor or a totally fine thing.
P.S I'm locking on a certain id through a map.

1 reply
Ugnius Rumsevicius
@UgiR
wont you be blocking with locks?
Ismail Marmoush
@IsmailMarmoush
@UgiR I guess so, my only excuse was that this function is part of an "in memory" db with add() and get() operations and I needed a way to synchronize updates on a certain Id.
Do you think of a better way ?
Mister_Jesus
@Bittuw

Hello everybody, why this code doesn't work as expected?

@Test
@SneakyThrows
void simple() {
    final var emit = Sinks.many().multicast().onBackpressureBuffer();
    Flux.interval(Duration.ofSeconds(5), Schedulers.newSingle("special-timer")).log()
            .filterWhen(ll -> emit.asFlux().next().hasElement()).log()
            .subscribeOn(Schedulers.newSingle("under-test")).subscribe(ll -> System.out.println(ll));

    for (int i = 0; i <= 100; i += 1) {
        emit.tryEmitNext(0L);
        TimeUnit.SECONDS.sleep(10);
    }
}

I thought that each subscriber in filterWhen statement will wait for one element, but actually only the first subscription ends. All others still wait for one element...

1 reply
Prashant Pandey
@limafoxtrottango

I need some help understanding the threading behaviour of Mono.fromCallable(..). I have this code:

  public static void main(String[] args) throws InterruptedException {
    Flux.range(1, 100_000)
        .doOnNext(a -> System.out.println(a + ", thread: " + Thread.currentThread().getName()))
        .flatMap(a -> Mono.fromCallable(() -> blockingMethod(a)).subscribeOn(Schedulers.elastic()))
        .subscribe();
    System.out.println("Here");
    Thread.sleep(Integer.MAX_VALUE);
  }

  private static int blockingMethod(int s) {
    try {
      Thread.sleep(100_000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return s;
  }

What's happening is:

  1. The first 256 elements are printed on main (in doOnNext) as expected.

  2. After around 1 second, the next 256, then the next and so on. Elements from the second batch onwards are printed on elastic threads.

I would expect all the elements to be printed on the main thread. Why do I observe this behaviour? I have also asked this question here on Stack Overflow.

TIA

1 reply
Jorge F. Sánchez
@jfsanchez_gitlab
Hi community, does reactor's HttpClient support SSL client certificates?
Violeta Georgieva
@violetagg
@jfsanchez_gitlab If you configure the client’s SslContext like this isn’t it what you need
1 reply
SslContextBuilder.forClient().keyManager(clientCrtFile, clientKeyFile, clientKeyPassword)
supAllen
@supAllen
The maps are 15 tasks, but I use the following code, and the tasks executed are not necessarily 15 tasks. What is the reason?
private static void fluxTest2(List<Supplier<Map<String, Object>>> maps) {
        Map<String, Object> reduce = Flux.fromIterable(maps)
                .buffer(5)
                .parallel(maps.size() / 5)
                .runOn(scheduler)
                .map(entry -> {
                    long start = System.currentTimeMillis();
                    Map<String, Object> resMap = Maps.newHashMapWithExpectedSize(entry.size());
                    for (Supplier<Map<String, Object>> supplier : entry) {
                        Map<String, Object> realData = supplier.get();
                        resMap.putAll(realData);
                    }
                    System.out.println("step time: " + (System.currentTimeMillis() - start));
                    return resMap;
                })
                .reduce((m1, m2) -> {
                    m1.putAll(m2);
                    return m1;
                })
                .block();

        System.out.println(reduce);
        Map<String, Object> map = reduce;
        Integer t = map.values().stream()
                .map(v -> Integer.parseInt(String.valueOf(v)))
                .reduce(Integer::sum)
                .get();
        System.out.println("size: " + map.size() + "\ttime: " + t);
    }