Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Kirill Marchuk
@62mkv
ah, ok
by the way, is it a safe way to subscribe to a queue ? it works, but I guess it would not be able to re-connect when a connection has gone for some reason?
(as Receiver is already closed)
Kirill Marchuk
@62mkv
@bsideup yep, thanks! now it logs as expected, [qp-receiver-119]. by the way, this last number is a global counter of threads, not specific to this scheduler?
I guess I have to make spring cleaning once...too many threads
Sergei Egorov
@bsideup
@62mkv the counter is per scheduler
Kirill Marchuk
@62mkv
hmmmmm. there's no chance it could arrange 120 threads in this instance of scheduler
maybe due to devtools restarts..
@bsideup thanks! .connectionMono is a new to me..
Jakub
@jmayday

Hi there. I'm still in process of learning reactive programming. Please help with such a simple task - I want to expose an endpoint simply returning Flux<MyStructure> (response of some remote call). It's easy and it's just like doing

@GetMapping
public Flux<MyStructure> find() {...
  return remoteService.findAll(param1, param2); //findAll returns Flux<MyStructure>
}

Question is - what if I would to wrap response into another structure so that it's not

[{name: name1}, {name: name2}]

but

{data: [{name: name1}, {name: name2}]}

I assume I should first create wrapper class and return Mono<WrapperClass>. If yes - how to build the response.

Kirill Marchuk
@62mkv
return remoteService.findAll(param1, param2)
    .map(myStructure -> buildNewStructure(myStructure);

NewStructure buildNewStructure(MyStructure myStructure) {
...
}
ah, you want to return a Mono
return remoteService.findAll(param1, param2)
    .collectList()
    .map(myStructure -> buildNewStructure(myStructure);

NewStructure buildNewStructure(MyStructure myStructure) {
...
}
then
Alexander Wilhelmer
@awilhelmer
Hello!
Is there a way to handle graceful shutdowns in reactor? Specially when using retryBackoff, i need to be triggered on shutdowns. DoOnCancel doesn't work here. Any suggestions?
Alexander Wilhelmer
@awilhelmer
And no - i'm not in a Spring context ... :(
Jakub
@jmayday
@62mkv so simple as that. Thanks!
Kirill Marchuk
@62mkv
@awilhelmer I am not an expert but maybe you could provide a little more context? what is subscriber, what is publisher, what is expected as a "graceful" shutdown ?
Alexander Wilhelmer
@awilhelmer
@62mkv yes one moment please
Brian Norlander
@norMNfan
Hello! I have been using Reactive programming for about 4 months now and have been looking for a community. Looking forward to engaging with this one!
Mark Paluch
@mp911de
Welcome, Brian!
Alexander Wilhelmer
@awilhelmer

@62mkv - I'am in a Java App with Reactor included. Technically i'am in a Spring application but without Webflux and Netty. I'am just using Reactor. Thats working fine.
Now i have a WebRequest with retryBackOff. On Shutdown its directly killed, but i need a hook to mark that this request is canceled. Some example:

String executionId = execution.getId();
 service.doWebRequest(param)
    .retryBackoff(3, Duration.ofSeconds(30), Duration.ofSeconds(30))
    .doOnError(e -> createIncident(executionId , e))
    .doOnCancel(() -> createIncident(executionId))
    .subscribe(response -> {
         ... 
        processEngine.getRuntimeService().signal(executionId, callbackPayload);
});

I need to call createIncident on shutdown hook ...

Kirill Marchuk
@62mkv
by graceful shutdown you mean SIGTERM or .. ?
Alexander Wilhelmer
@awilhelmer
yes
Kirill Marchuk
@62mkv
looking at spring-projects/spring-boot#4657 I would assume you want your publisher exposed (wrapped) as a @Bean and implement a DisposableBean .. probably
maybe you could do that with subscription as well (or instead)
anyway, I am not sure I understand why an incident related to subscription (shutdown) be registered in a same manner as with publisher problems
Alexander Wilhelmer
@awilhelmer
cause the task was canceled. Yes that's hacky :D
but even better than a zombie task
Kirill Marchuk
@62mkv
what is the nature of service.doWebRequest ? is it WebClient ?
Alexander Wilhelmer
@awilhelmer
Yes it is
For the background -> i'm trying to use reactor with camunda
Kirill Marchuk
@62mkv
I see there're multiple implementations of ClientHttpConnector, the default one seems to be Netty based. So, might be worth asking in https://gitter.im/reactor/reactor-netty?
maybe connection pool has lifecycle events which you could consume .. somehow
Alexander Wilhelmer
@awilhelmer
so reactor hasnt got the possibility to handle shutdowns? Call cancel on all running reactive pipelines instead of doing ... nothing ... :(
Kirill Marchuk
@62mkv
Reactor is a set of abstractions enabling reactive programming model
epdittmer
@epdittmer
Hi. Does somebody have a solution for the following problem? We are sending pushes towards clients in a fire&forget fashion. However, we would like to keep the pushes - per client - in order. How should this be approached?
Kirill Marchuk
@62mkv
should the burden of processing OS signals lie within it ? not sure, but I am not nearly an expert
Alexander Wilhelmer
@awilhelmer
Reactor will have a event handling thread, but iam not an expert too. The possibility to tell reactor how to handle sigterms would be nice ...
Kirill Marchuk
@62mkv
anywhere there's an issue containing "SIGTERM", it's in one of the libraries, built on top of Reactor, which hints at something probably

also see from the channel description:

For advanced questions you can also try #reactor-core

Alexander Wilhelmer
@awilhelmer
i don't know what is advanced :D
Kirill Marchuk
@62mkv
this question certainly is
Alexander Wilhelmer
@awilhelmer
but my question is answered anyway .... it isn't possble at all except wrapping some own transactional things to handle that...
or simply wait 30*3 seconds :D :D
Kirill Marchuk
@62mkv
it's for older Reactor version, but might give you ideas
epdittmer
@epdittmer
Is it possible to create a Publisher from an queue? or to push to an publisher externally ?