Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
josh gruenberg
@joshng

hi there. I'd appreciate some advice: I have a flux of timestamped events (from kafka), and I need to collate them into event-time windows, with a configured accumulation timeout. the events aren't strictly ordered, but I can tolerate windows closing prematurely: stragglers that arrive after their previously-open window has closed can just be allocated into a new, separate window for the same time period.

I seek an appropriate strategy for achieving this without leaking resources or degrading performance as the cardinality of event-time windows grows without bound.

at first glance, something like the following seems like it might work:

events.groupBy(event -> event.timestamp().truncatedTo(windowDuration))
        .flatMap(group -> group.window(windowAccumulationTimeout))

however, I'm concerned about the implications of groupBy for that unbounded stream of groups. I suspect this might be problematic because each group will remain in the the groupBy state forever, even after the window-timeout expires for the last straggling event in each window.

am I wrong to be concerned about this? is there a better way to do what I need?

7 replies
Salathiel Genèse
@SalathielGenese

Hey guys

I trust you all are thriving.

For reusability concerns, I run my validation on the service layer, which returns Mono.error( constraintViolationException )...

So that my web handlers merely forward the unmarshalled domain to the service layer.

So far, so great.


But how do I advise (AOP) my web handlers so that it returns HTTP 422 with the formatted constraint violations ?

WebExchangeBindException only handle exceptions thrown synchronously.

My AOP advice trigger and error b/c :

  • my web handler return Mono<DataType>
  • but my advice return a ResponseEntity

And if I wrap my response entity (from the advice) into a Mono<ResponseEntity>, I an HTTP 200 OK with the response entity serialized :(

Salathiel Genèse
@SalathielGenese
The link to the SO question, with code excerpt - https://stackoverflow.com/q/69519724/3748178
Zane XiaoYe
@Zane-XY
Hi, I’m working on rewriting a periodically pulling job (using @Scheduled annotation) into reactive stream. The system pulls messages from an message queue(not Kafka). It is thread safe and the previous implementation is using multiple clients/threads to do the pulling. After the messages are pulled, we need to do some processing(enrichment, sending to another queue) then ack back to the message queue.
What is the equivalent operator/API to do the same work using Reactor?
Daniil Mikhaylov
@mdsina

Hi there!
I have a question. Why it's recommended to run blocking code out of Mono.defer() or mono.flatMap() in Mono.fromRunnable() or Mono.fromCallable()?
I mean, if I look at sources of Mono.fromRunnable(), actually it does like the same as Mono.defer():

    public void subscribe(CoreSubscriber<? super T> actual) {
        MonoRunnableEagerSubscription s = new MonoRunnableEagerSubscription();
        actual.onSubscribe(s);
        if (s.isCancelled()) {
            return;
        }
        try {
            run.run();
            actual.onComplete();
        } catch (Throwable ex) {
            actual.onError(Operators.onOperatorError(ex, actual.currentContext()));
        }
    }

The blocking code runs in subscribe() method. Supplier in Mono.defer() does the same?
Does it matter if I switch thread through .subscribeOn() operator?
Also MonoSubscribeOn cause me for thought. It does worker.schedule(() -> s.request(n)); on parent, when subscribe happens. But I can't find what happens, why request() scheduled on worker, not subscribe()?
Thanks

Red444
@MiloS444

Hi all,

I have function that deletes user both from database and keyclaock. Sometimes it happen that user gets deleted from database but remains in Keyclaock. Dose anyone have an idea what could be a problem? Only happens from time to time but do not know why.

        return _companyRepository.findById(companyId))
            .flatMap(companyDocument -> {
                    final List<Employees> employees = companyDocument.getEmployees();
                    if (employees != null && !employees.isEmpty())
                    {
                        final boolean isRemoved =
                            employees.removeIf(employees -> employees.getId().equals(employeeId));
                        if (isRemoved)
                        {
                            companyDocument.setEmployees(employees);
                            return _companyRepository.save(companyDocument).thenReturn(employeeId);
                        }
                    }
                }
                return Mono.just(employeeId);
            })
            .flatMap(employeeId -> _keycloakService.deleteUser(employeeRealm, employeeId))
            .onErrorResume(e -> Mono.empty());
Salathiel Genèse
@SalathielGenese
And a StackOverflow +50 bounty
Zakir
@Zaky7

Hi, I am using SSE in the Spring web flux application. I have created an SSE endpoint that gets data from Redis pub-sub and Redis-pub gets data from an External WebSocket connection.

My problem is that when there is no data from Websocket then as soon as I try to connect with my SSE endpoint the connection terminates due to the unavailability of data. My hunch is that I need to pass comments heartbeat if data is not coming. how can I do that?

@PostConstruct
private fun realtimeSinkInitializer() {

        /**
         * onBackpressureBuffer by default cancel the subscription when last client closes, we made auto cancel false
         */
        realtimeSink = Sinks.many().multicast().onBackpressureBuffer(1000, false)


        reactiveMsgListenerContainer.receive(weatherTopic)
            .onBackpressureBuffer(1000)
            .map(ReactiveSubscription.Message<String, String>::getMessage)
            .map { msg ->
                objectMapper.deserialize<List<WeatherDto>>(msg)
            }
            .subscribe(
                { weatherData: List<WeatherDto> ->

                    realtimeSink.tryEmitNext(weatherData)

                },
                { err ->
                    logger.error(err) { "Error occurred redis subscription channel" }
                }
            )
    }

 fun getRealtimeWeather(): Flux<ServerSentEvent<Map<String, WeatherDto>>> {
        return weatherSink.asFlux()
            .map { e -> ServerSentEvent.builder(e).build() }.onErrorResume { e ->
                logger.error(e) { "Error getting realtime weather data" }
                Flux.error(InternalServerException())
            }
    }
2 replies
Mantas Matūzas
@mantoshelis
Hello, I'm trying to integrate reactive and non-reactive code. The use case is to have non-reactive event listener and call reactive service. Do you think is it fine to do something like this?
  @EventListener
  public void handle(TheEvent event) {
    reactiveEventService.handle(event).subscribe();
  }
Salathiel Genèse
@SalathielGenese

First...

The subscribe() method return a Disposable... If you do not keep a reference to that object, when do you call its dispose object ?

Second...

Even if you do, it doesn't really make the whole thing reactor-compliant

What you need (I think)
Is a publisher [maybe within your service], to which you can PUSH TheEvent as they come in
The handle( TheEvent ) method from your except pushes events and do not subscribe to it
Salathiel Genèse
@SalathielGenese
You reactiveEventService will be responsible of subscribing to that published OR, if it exposes that publisher (Mono, Flux or else), whatever code want to listen from it
Mantas Matūzas
@mantoshelis
@SalathielGenese makes sense. I will try it.
Salathiel Genèse
@SalathielGenese
+1
Anand Jaisy
@anandjaisy
```
public DiscountCommand create(Mono<String> insertId, DiscountCommand discountCommand) {
    return insertId.map(item -> {
        return new DiscountCommand(
                item,
                discountCommand.discountCode(),
                discountCommand.discountType(),
        );
    }).block(Duration.of(1000, ChronoUnit.MILLIS));
}
```
If the insertId is null, it throws an null pointer exception and application goes to infinite. How can I handle error
Salathiel Genèse
@SalathielGenese
1./ You may want to annotate your parameter as non null

2./ You can use if/else OR Optional

Optional.ofNullable( insertId )
  .map( __ -> insertId.map( item -> new /* ... */ ) )
  .orElseGet( Mono::empty )
  .block(/* duration */);

Something like that (untested)

9 replies
Raghu
@code-uri
Hello, How can emit an item received through a rest endpoint without a blocking call to emit. i want to emit the item to a sink and return the item as response.
```
Sinks.Many<String> inputSink = Sinks.many().multicast().onBackpressureBuffer();
public Mono<String> process(String uid) {
inputSink.emitNext(uid, (signalType, emitResult) -> {
return false;
});
return Mono.just(uid);
}
Raghu
@code-uri
Basically, i want to process the item received via rest endpoint asynchronously. I would like to subscribe to the processed items in another thread as shown below
I am subscribing to inputSink and process item and emitting outputSink
Sinks.Many<String> outputSink = Sinks.many().unicast().onBackpressureBuffer();
public Flux<String> getProcessedItems() {
        return outputSink.asFlux();
  }
Is this approach correct?
ruslan
@unoexperto

Hi folks. I'm trying to use Sinks.many().unicast().onBackpressureBuffer() for the first time and I have unexpected result. Here is how I use it

// on Thread B
flushingSink.tryEmit(...)

flushingSink.asFlux()
  .subscribeOn(THREAD-POOL-A)
  .map {
    // doing work here
    // executed on Thread B, instead of THREAD-POOL-A
  }

I see that flushingSink.tryEmit is blocking AND queued messages are processed on calling thread. I thought they would be queued asynchronously. What am I doing wrong ?
Thanks!!

3 replies
ruslan
@unoexperto

Does anybody know if passing Mono/Flux around in the project is right way of doing chaining? It seem like I'm losing performance on all these reactor wrappers. In profiler I see call-stack like this one https://snipboard.io/fXT6KW.jpg

I see calls as deep as 150 nested Reactor invocations.

1 reply
ruslan
@unoexperto
Hi folks! What is the right answer to this question ? https://stackoverflow.com/questions/31185129/parallel-dispatch-of-groupby-groups-in-reactor
Basically I'm trying to do the same - I want to process some work in parallel, always preserving original order per group (even if group changes thread later).
6 replies
Dmitry
@dshuplyakov
Hi there!
I want to read a message and then execute a http request. If http request 2хх, ack message, otherwise - nack. But the problem is flatMap returns Mono<ResponseEntity> and I can’t pass to map original amp message.
RabbitFlux.createReceiver().consumeManualAck(QUEUE_NAME, consumeOptions)
        .flatMap(amqpMessage -> {
            String url = new String(amqpMessage.getBody());
            return webclient.get().uri(url).retrieve().toBodilessEntity();
        })
       .map(responseEntity -> {
           if (responseEntity.getStatusCode().is2xxSuccessful()) {
               // how to pass here amqpMessage?
               amqpMessage.ack();    
           } else {
               amqpMessage.nack();
           }
           return amqpMessage;
        }).subscribe();
1 reply
harshdave840
@harshdave840
Hi everyone ..I have 2 hot flux which are running in parallel and one mono. I want mono to be always executed after both the flux have completed. I tried to concat first flux and mono but mono executes immediately after first flux has completed.How do i make sure that mono always runs after both the parallel flux have completed.Below is just an example.

var sourceFlux= Flux.range(1,10);
var hotflux1=sourceFlux.publish().refCount(2);
var firstFlux=hotflux1.map(i->i*2).log("First");

//this is a io blocking call
var secondFlux=hotflux1
.publishOn(Schedulers.boundedElastic()).map(i->i*5).log("Second");
secondFlux.subscribe();
//

var firstMono=Flux.just(300).log("First Mono");
var concatedFlux=Flux.concat(firstFlux,firstMono);
concatedFlux.subscribe();

3 replies
In real first flux is calling some service and second flux is db call and first mono is also service call. Mono requires data from both first flux and second flux before calling its service.
Ben Tomasini
@btomasini

I am using an approach similar to this to lift a reactor context into ThreadLocal for use by a grpc client interceptor.

https://stackoverflow.com/questions/47501349/using-hooks-and-a-lift-to-push-context-into-threadlocal

I am getting some stale values due to some bugs in the way I am using it. My main concern with this approach is its dependence on the onNext call to lift a fresh context into the ThreadLocal. If used incorrectly - namely not doing a contextWrite on a stream, it can lead to leaked values from other threads.

One way I thought of to add more safety would be to clear the ThreadLocal in the onComplete or onError of the subscriber. From my initial inspection it appears this works because, so far, the onComplete and onError is invoked on the same thread as onNext.

Question: Is it guaranteed that onComplete and onError is called on the same thread as onNext for a given subscriber like this?

1 reply
L.J.
@liujingbreak

Is there any exhaustMap operator on reactor core ?

I have same question, came across how API doc of Reactor 3, can't find equivalent operator, google shows me this channel and conversation

Marek Andreansky
@skyhirider
Can you recommend a well written open source library that utilizes Reactor? I would like to read through some real code to see how I could utilize Reactor.
Salathiel Genèse
@SalathielGenese
Spring Webflux !?
Netty ?
John Meehan
@n0rb3rt
Looking for a way to chunk a Flux<ByteBuffer> into specified chunk sizes in bytes. Does such a solution already exist somewhere?
bruto1
@bruto1

Looking for a way to chunk a Flux<ByteBuffer> into specified chunk sizes in bytes. Does such a solution already exist somewhere?

concatMap and external state for buffer remainders less than specified size?

John Meehan
@n0rb3rt
That's the path I'm headed down. Coming from Akka streams and Scala, keeping external state seemed like the wrong approach. But I see from other examples that's not out of the norm here. Something like?
    public static Flux<ByteBuffer> chunker(Flux<ByteBuffer> data, int chunkSize) {
        return Flux.defer(() -> {
            AtomicLong countDown = new AtomicLong(chunkSize);
            return data.bufferUntil(buf -> {
                if (countDown.addAndGet(-buf.remaining()) <= 0L) {
                    countDown.set(chunkSize);
                    return true;
                }
                return false;
            }).concatMap(bufList -> {
                int size = bufList.stream().mapToInt(ByteBuffer::remaining).sum();
                return Mono.just(bufList.stream().reduce(ByteBuffer.allocate(size), ByteBuffer::put));
            });
        });
    }
4 replies
kerr
@hepin1989
Hi, how to implemente throttling in reactor?
Daniel Wahlqvist
@offroff
Can I do a nested groupBy? I have equipments and for each equipment I have different metrics. So I would like to do a groupBy for each equipment and then an inner groupBy for each type of metric. That might be the wrong approach of course but I'm mostly trying to get started and learn
2 replies
josh gruenberg
@joshng

is there a straightforward way to achieve a graceful flush/shutdown with an infinite source? I have a Flux from a KafkaReceiver, and at shutdown, I want to stop receiving messages from upstream, but allow all of the in-flight work to complete, and then allow the KafkaReceiver to commit its offsets (via the provided ReceiverOffset.acknowledge callback), before the KafkaReceiver closes its underlying kafka consumer. dispose seems to propagate upstream immediately, canceling all buffered work... and it also induces the KafkaConsumer to be closed before the offsets associated with that work can be committed.

I think this might naturally take the form of arranging for the request signals flowing upstream to the KafkaReceiver to be disabled at shutdown time, but I haven't found a simple way to inject that behavior into the wiring. I'm using a pretty hacky solution that involves publish to a ConnectableFlux, with a bogus consumer that stops requesting ... but this seems pretty heavyweight and unnatural. Any suggestions?

3 replies
Zakir
@Zaky7

Hi everyone, I have been using Mono.zip a lot in my application and since I am from nodejs background, I was thinking it works like promise.all but I found recently In mono.zip if one of the mono returns empty it won't run

replication

fun main() {
    val m1 = Mono.just("A")
    val m2 = Mono.just("B")
    val m3 = Mono.empty<String>()

    Mono.zip(m1,m2,m3).map {
        println("Data t1: ${it.t1} t2: ${it.t2} t3: ${it.t3}")
    }.subscribe()
}

It dont prints anything

workaround

fun main() {
    val m1 = Mono.just("A").map { Optional.of(it) }.defaultIfEmpty(Optional.empty())
    val m2 = Mono.just("B").map { Optional.of(it) }.defaultIfEmpty(Optional.empty())
    val m3 = Mono.empty<String>().map { Optional.of(it) }.defaultIfEmpty(Optional.empty())

    Mono.zip(m1,m2,m3).map {
        println("Data t1: ${it.t1.get()} t2: ${it.t2.get()} t3: ${it.t3.orElse(null)}")
    }.subscribe()
}

My requirement is I have a couple of mono and I need output of all of them even one is empty. In case any of the mono fails then it can throw an error.

@OlegDokuka. any view regarding this buddy

1 reply
Salathiel Genèse
@SalathielGenese
I think you're doing it right, Zakir. Since null value will result in an empty mono
3 replies
Daniel Wahlqvist
@offroff
I have a flux and I want to dynamically tell it how to transform the data. My idea is to join the flux with another "command flux" and adapt the processing depending on the last command. Is this idea similar to a real and tested pattern someone else have used? I suspect you guys can give me some hints. What I need to do is to figure out how to implement this and I need to validate if it's a good pattern at all.
3 replies
Daniel Wahlqvist
@offroff
I've been reading the API documentation and feel a little overwhelmed but also impressed and curious. Now I'm wondering; what is Reactor? Sure, I get it's a reactive programming framework but what I haven't anticipated is what such a framework must evolve into, if it's successful. I didn't realize this before but now I believe a successful reactive programming framework MUST in the long run evolve into a successful stream processing framework. That might be a strange claim, and I know that's not what many people believe. E.g. this attitude is common: https://www.tikalk.com/posts/2017/12/06/streaming-vs-reactive/ Given all this; what is Reactor and what is it evolving into?
Salathiel Genèse
@SalathielGenese
That article just sparks confusion
The term “stream” [...] is a generic term that applies to Java Streams, Observables and even Iterators.
1 reply
Kirill Marchuk
@62mkv
Hi all. I am a bit confused with Gradle Metadata. In the .pom artifact from here: https://repo1.maven.org/maven2/io/projectreactor/reactor-core/3.3.5.RELEASE/, it says "published-with-gradle-metadata". but the folder for this release (same URL) does not contain any .module file. How come?
SiwyDym
@SiwyDym
Hi! I have a problem with using ExchangeFilterFunction in WebClient. I declared a filter that runs backoff retry on error, but for WebClientResponseExceptiontype it doesn't work, retry mechanism not run. Unlike WebClientRequestException for which is working. What I do wrong?
//creating client 
var client = WebClient.builder()
               .defaultHeader(CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .baseUrl(serviceUrl)
                .filter(decorateFunction)
                .build();
(...)
//example of using
webClient.delete().uri(deleteUrl).retrieve()
            .bodyToMono(XXX.class)
            .doOnSuccess(consumeSuccess())
            .doOnError(consumeError())
            .subscribe();
(...)
//`decorateFunction` is a like below 
public interface ReactiveRequestDecorator extends ExchangeFilterFunction {

    /**
     * Decorate the given request.
     *
     * @param request the request to decorate
     * @param <T> the type of result returned
     * @return decorated request
     */
    <T> Mono<T> decorate(Mono<T> request);

    @Override
    default Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
        return decorate(next.exchange(request));
    }

}

// example implementation
(...)
@Override
public <T> Mono<T> decorate(Mono<T> request) {
        return Mono.defer(() -> request)
                .retryWhen(Retry.backoff(3, Duration.ofMillis(200));
    }
(...)
1 reply