Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
luvarqpp
@luvarqpp
@Jingon-Park have you put some blocking to your main thread? i.e. System.in.read, or Thread.sleep(sufficientlyLongTime)? i.e. it can be problem that you create your pipeline and reactor start processing things (i.e. first delay), but your main thread finishes in a miliseconds and so exits with 0. Any processing in-flight are cancelled.
bartoszolczyk
@bartoszolczyk

Hi guys. I have a big struggle on my side and maybe someone, somehow know the answer or at least a lead to hang into :D
To the point - I have an environment on spring boot where i store Fluxes of defined webclients (that connects to different hosts ) in a map at the to connect to and get data.
If someone from the front-end side asks for same source of data i return data from the cache instead of creating a new webclient instance :)

It woks like a charm but everything collapses when ip/port is changed on the instance - then users cannot get data from the stored webclient.
I've thought about several solutions to the problem :

  • detect no active subscribers and delete it from the map
  • schedule a cleanup action - to remove old unused / failed fluxes
  • define action in back propagation mechanism to swap connection details

In code below I have put a definition of cache manager.

I would be very grateful for any kind of help or advice how to handle this hard cookie :D

$$

public abstract class CachedFluxReactiveManager<T, M extends FiltreableCommand> extends CachedReactiveManager<T, M, Flux<T>> {
protected final Map<ReactiveCallsKey, Flux<T>> connections = new ConcurrentHashMap<>();
protected final FluxClientCreator<T> clientCreator;
protected final InstancesQueryCreator queryCreator;
private final boolean useCache;

protected CachedFluxReactiveManager(InstancesCache instancesCache, FluxClientCreator<T> clientCreator,
        InstancesQueryCreator queryCreator, boolean useCache) {
    super(instancesCache);
    this.clientCreator = clientCreator;
    this.queryCreator = queryCreator;
    this.useCache = useCache;
}

@Override
public Flux<T> getData(M command) {

    if (useCache) {
        if (command.isFiltered()) {
            return createAndReturnConnection(command).next().flux();
        }

        return ofNullable(getConnectionIfExists(command))
                .orElseGet(() -> createAndReturnConnection(command));
    } else {
        return createAndReturnConnection(command);
    }
}

@Override
protected Flux<T> getConnectionIfExists(M command) {
    return connections.get(
            getReactiveCallsKey(command.getApplicationName(), command.getTicker(), command.getReferenceEx()));
}}$$
luvarqpp
@luvarqpp
@bartoszolczyk I do not see problem at first look at your code. What triggers questions in my head, is use of next().flux() only in one of three usages of create and return connection: createAndReturnConnection(command).next().flux(). Why it is so?
bartoszolczyk
@bartoszolczyk

@luvarqpp thanks for asking - I did not expressed actual reactor implementation so have my apologies - due to some big brain time I've split the flux implementation into several, smaller ones (they are in a spot below where you are asking )

    @Override
    protected ReactiveCallImpl createConnectionDetails(MMFilterableCommand command) {

        final var instance = getInstance(command.getApplicationName());
        return ReactiveCallImpl.builder()
                .applicationName(instance.getApplicationName())
                .ticker(command.getTicker())
                .url(getUrl(instance))
                .reference(command.getReferenceEx())
                .body(MarketMakerRequestBody.builder()
                        .source(instance.getExchangeName())
                        .reference(command.getReferenceEx())
                        .ticker(command.getTicker())
                        .referenceFeed(command.getReferenceEx())
                        .limit(0)
                        .build())
                .build();
    }
    protected String getUrl(InstanceInfoDto instance) {
        return UriComponentsBuilder.newInstance()
                .host(instance.getHostIp())
                .scheme("http")
                .port(instance.getHttpPort())
                .pathSegment("orders")
                .toUriString();
    }

and here (after this mentioned method i'm adding some flavor to manage the fluxes ):

    protected Flux<T> manageConnectionConfiguration(ReactiveCall reactiveCall,
            Flux<T> out) {
        return out.doOnError(throwable -> {
                            connections.remove(new ReactiveCallsKey(reactiveCall.getApplicationName(), reactiveCall.getTicker(),
                                    reactiveCall.getReference()));
                            log.error("connection error for {} reactive source",
                                    reactiveCall.getApplicationName(),
                                    new OperationException(systemExceptionMessage, HttpStatus.INTERNAL_SERVER_ERROR,
                                            throwable));
                        }
                )
                .doOnSubscribe(
                        subscription -> log.debug("subscription established for {} ", reactiveCall.getApplicationName()))
                .doOnComplete(() -> log.debug("subscription completed for {} ", reactiveCall.getApplicationName()))
                .doOnError(throwable -> {
                    log.error("exception occurred connection for {}  {}", reactiveCall.toString(),
                            ExceptionUtils.getStackTrace(throwable));
                    connections.remove(
                            new ReactiveCallsKey(reactiveCall.getApplicationName(), reactiveCall.getTicker(),
                                    reactiveCall.getReference()));


                })
                .doFinally(vinChangedEventSignal -> log.debug("client connection terminated for {}",
                        reactiveCall.getApplicationName()));
    }
Jingon-Park
@Jingon-Park

Hi all,
I want to delay Flux elemets, but Java process was finished with exit code 0

public static void main(String[] args){
   Flux.range(1, 10)
                .delayElements(Duration.ofMillis(1000))
                .flatMap(data -> {
                    log.info("test : {}", data);
                    if (data == 5) {
                        return Flux.error(new RuntimeException("test"));
                    }
                    return Flux.just(data);
                })
                .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
                .onErrorResume(e -> {
                    log.info("resume");
                    return Flux.empty();
                })
                .repeat()
                .subscribe();
}

That is my code i intend Flux stream infinite. but not working.
Why my java process was finished?
plz help me.

1 reply
Harman Singh
@Harmannz

Hi team, I am confused by expectNoEvent in StepVerifier.
The following test passes as expected.

  @Test
  public void testWithDelayElements() {
    StepVerifier.withVirtualTime(() -> Flux.just(1, 2, 3).delayElements(Duration.ofSeconds(1)))
        .expectSubscription()
        .expectNoEvent(Duration.ofSeconds(1))
        .expectNext(1)
        .thenCancel()
        .verify();
  }

expectNoEvent will correctly throw an assertionError if we set duration in delayElements to less than 1 second. This makes sense since the stepverifier receives an event sooner than the provided duration.

But if we remove delayElements from the flux, we expect the test to fail since the flux emits the 1 event immediately. right?

  @Test
  public void testWithoutDelayElements() {
    StepVerifier.withVirtualTime(() -> Flux.just(1, 2, 3))
        .expectSubscription()
        .expectNoEvent(Duration.ofSeconds(1))
        .expectNext(1)
        .thenCancel()
        .verify();
  }

However, the test passes. I expected an expected no event assertion error to be thrown. Am I missing something?

2 replies
amit306
@amit306
I am looking for Mono.repeatWhenEmpty example. Couldn't understand what is the second argument (repeat factory)
Guillaume DROUET
@gdrouet
Hello, what is the best approach to consume two sources in parallel and get the values from only one? Something like when(), but I need a Flux from of one the sources in result
ruioliveiraz
@ruioliveiraz
Hello :)
I building a reactive APP, which uses Spring Events to propagate events in an async way (using ApplicationEventMulticaster or ApplicationEventPublisher with @Async on the Event Listeners). Both solutions do not work properly as sometimes I do not see the events being triggered. Do you know if if Spring Events are compatible with spring-reactor? If not, could you please tell me what is the alternative to Spring Events on spring-reactor?
Thanks in advance
Firas
@firas1220
Hello guys! I'm new here 😇 anybody please tell me how to discuss topics here ,
Ashok Koyi
@thekalinga

I am using Websockets with reactor-netty library & after a minute/two, the session gets terminated silently & the application gets stuck without any error in the middle of messages being exchanged b/n HttpClient & Websocket Server using WebsocketInbound & WebsocketOutbound. This happens when the debug log showsHttpClient is exchanging WebsocketFrames. Once this happens, it halting the exchange & everything gets stuck (frame exchange halts suddenly after this) silently.

There is no error in console, no CloseWebSocketFrame (WebsocketInbound#receiveFrames), no WebSocketCloseStatus (WebsocketInbound#receiveCloseStatus), nothing. It just gets stuck & I get debug statements saying 0 active connections & 0 inactive connections. Where as just a second before, the log shows its receiving binary frames from server.

Did anyone else face this issue. How can I debug whats causing this termination?

Ashok Koyi
@thekalinga
@violetagg Any idea on whats the process to debug this odd issue? (the connection is being removed from the connection pool automatically while messages were still being exchanged b/n both ends). Appreciate any help in this regard.
51 replies
ZbiegniewKap
@ZbiegniewKap
Hi! I'm looking for technology which I can use in my reactive project. I'm interested in reactor but i'm not sure if it's good for my project. So here are my questions to You: 1.The Scheduler is ready to be installed in HA.
2.Does It has a queueing model implemented?
3.Does It stores tasks (events, definitions)?
  1. Does it removes triggers?
  2. Do he requests are cyclic?
  3. Does It has timed triggers?
I would be very grateful for helping me in this! And thank you for your time!
ghilainm
@ghilainm
Hi, can I update the context based on the result of a mono execution?
James Howe
@jhoweaa

I'm new to Reactor (and reactive programming) and I have a question about error handling that I hope someone can help with.

I have some Reactor code which uses flatMap to make several asynchronous HTTP calls. After the flatMap, we process the responses to build a final result to return to a user. It is possible that some, or all, of these calls may fail. The most common reason would be that something about the input request was invalid. We want to capture all of the responses, including the error responses.

In looking at the Reactor API, I was hoping to find a method which could substitute a replacement value when an error occurred in a flatMap operation, but continue processing the rest of the pipeline, but this doesn't seem to be the case? The onErrorContinue lets me ignore the error and maybe log data, but I can't push data to the pipeline to replace the bad item. The onErrorResume lets me start a new Flux, but I want the rest of my flatMap to run. I just want to replace the bad result(s) with some error result.

My current code does the following:

    Function<Request, Publisher<ObjectNode>> requester = serviceClient.getAnalysis().apply(authToken, user);
    return
      Flux.fromStream(requestStream(noOfRequests, new Random()))
        .flatMap(requester)
        .onErrorContinue((ex, response) -> log.error(ex.getClass().getCanonicalName() + ": " + ex.getMessage()))
        .collectList();

The 'requester' is a function that takes a request and makes an Http call which returns a Publisher. The 'requestStream' method generates a set of test requests, some of which may cause 400 level errors when the request is processed. The expected result is a list of ObjectNode objects which contain JSON which is returned to the user.

What I would like to be able to do is handle the 400 errors, create an 'error JSON' object to replace the bad response, as well as log the original exception. Since I'm new to Reactor and reactive programming, I may be overlooking an obvious solution.

Thanks!

Coney
@coney

I'm using reactor 3.4.18 and have a question about Flux.groupBy. I have generated 1000 integers and splited them into 100 groups, I expect that each group could be process in sperate thread but it hangs after several integers processed.

    @Test
    void shouldGroupByKeyAndProcessInParallel() {
        final Scheduler scheduler = Schedulers.newParallel("group", 1000);

        StepVerifier.create(Flux.fromStream(IntStream.range(0, 1000).boxed())
                .groupBy(integer -> integer % 100)
                .flatMap(groupedFlux -> groupedFlux
                        .subscribeOn(scheduler) // this line doesn't help
                        .doOnNext(integer -> log.info("processing {}:{}", groupedFlux.key(), integer)),
                    2)
            )
            .expectNextCount(1000)
            .verifyComplete();
    }
10:47:58.670 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
10:47:58.846 [group-1] INFO com.huawei.hwclouds.coney.spike.FluxGroupByTest - processing 0:0
10:47:58.866 [group-1] INFO com.huawei.hwclouds.coney.spike.FluxGroupByTest - processing 1:1
10:47:58.867 [group-1] INFO com.huawei.hwclouds.coney.spike.FluxGroupByTest - processing 0:100
10:47:58.867 [group-1] INFO com.huawei.hwclouds.coney.spike.FluxGroupByTest - processing 1:101
10:47:58.867 [group-1] INFO com.huawei.hwclouds.coney.spike.FluxGroupByTest - processing 0:200
10:47:58.867 [group-1] INFO com.huawei.hwclouds.coney.spike.FluxGroupByTest - processing 1:201
-------- start hanging ----------

I have changed the flatmap concurrecy to 2 to speed up the reproduction.

Pradeep Kumar
@Pradeep17210238_twitter
Hi Folks I am trying to read the request body in Spring Cloud Gateway before the request reaches to
actual downstream services. I know that request can consumed only once but Is there any way
to do this?
HaloFour
@HaloFour
Is there any documentation to help in writing efficient operators that can participate in operator optimization, fusion, etc.? A lot of the helper classes/interfaces used by Reactor's own operators are package-private.
jayanthpatki91
@jayanthpatki91
Hi Guys, I am trying to demonstrate the concept of backpressure in reactive streams and i want to observe/log the effect of backpressure in an endpoint. Any ideas as to how i can demo the effect ?
Ocean_Moist
@ocean-moist:matrix.org
[m]
How do i combine multiple fluxes of type T into another flux of type T[], details here: https://stackoverflow.com/questions/74049770/best-way-to-combine-multiple-fluxt-into-fluxt/74051618#74051618
Dexter Huang
@dexterhuang-blockone
Hi, are there any use cases demonstrating Sinks.EmitResult.FAILED_CANCELLED status? - https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Sinks.EmitResult.html#FAIL_CANCELLED
Nick Caballero
@nickcaballero
Hi, what's up with the BlockHound project? Is anyone still watching issues/PRs there? https://github.com/reactor/BlockHound
2 replies
yohasakur
@yohasakur
Hi, I am using reactor rabbitmq, the problem I have is that my receiver does not recover or catch the exception when there is a rabbitmq delivery acknowledgement timeout "Channel error on connection <####> : operation none caused a channel exception precondition_failed: consumer ack timed out on channel 1". The receiver thinks that the connection is still open but rabbitmq already closed the connection.
Manvendra Singh
@manvendra

Hi,
how to make sure each flux item is processed in parallel, when chaining multiple Flux calls,

here is the code to explain the problem


@RestController
public class ParallelCheckController {

    @GetMapping("/items")
    List<Item> searchItems() {

        Map<BackendModule, List<Long>> idsByBackend = new HashMap<>(10);
        idsByBackend.put(new BackendModuleOne(),Arrays.asList(11L,12L,13L));
        iidsByBackend.put(new BackendModuleTwo(),Arrays.asList(21L,22L,23L));
        idsByBackend.put(new BackendModuleThree(),Arrays.asList(31L,32L,34L));

        //TODO: I want to call all backend concurrently in parallel.
       //But all Modules are called on same thread.
        return Flux
                .fromIterable(idsByBackend.entrySet())
                .publishOn(Schedulers.boundedElastic())
                .flatMap(e->e.getKey().searchItems(e.getValue()))
                .collectList()
                .block();
    }
}

interface BackendModule {
    abstract Mono<Item> searchItem(Long itemId);

    default Flux<Item> searchItems(List<Long> itemIds) {
        sleep(); log(itemIds);
        //TODO: I want to call all searchItem call concurrently in parallel.
        //But all Items from one module going on same thread.
        return Flux
                .fromIterable(itemIds)
                .publishOn(Schedulers.parallel())
                .flatMap(this::searchItem);
    }
    //To mimic the actual code latency
    default void sleep() {
        try {Thread.sleep(1000);}
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
    //To see the threads in logs
    default void log(Object obj) {
        System.out.println(
                Thread.currentThread().getName()
                        + " " + this.getClass().getSimpleName()
                        + " " + obj
        );
    }
}
 class BackendModuleOne implements BackendModule {
    @Override
    public Mono<Item> searchItem(Long itemId) {
        sleep(); log(itemId);
        return Mono.just(new Item(itemId));//This is a call to backend WebClient
    }
}
 class BackendModuleTwo implements BackendModule {
    @Override
    public Mono<Item> searchItem(Long itemId) {
        sleep();log(itemId);
       return Mono.just(new Item(itemId));//This is a call to backend WebClient
    }
}
 class BackendModuleThree implements BackendModule {
    @Override
    public Mono<Item> searchItem(Long itemId) {
        sleep();log(itemId);
        return Mono.just(new Item(itemId));//This is a call to backend WebClient
    }
}
@Data
class Item {
    private final Long id;
}

Here is StackOverflow link for more explanition
https://stackoverflow.com/questions/74279446/how-to-make-sure-each-item-in-reactor-flux-is-processed-in-parallel

153952402
@153952402
hi
Abhishek S
@abhisheksms97_gitlab
hello
datadidit
@datadidit

Hi,

I'm looking for some help to figure out the best way to stop sending messages as soon as I get a PoolAcquirePendingLimitException.

package datadidit;

import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientRequestException;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException;
import reactor.netty.resources.ConnectionProvider;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class NettyWebClientBackPressureIT {
    private Logger LOG = LoggerFactory.getLogger(NettyWebClientBackPressureIT.class);

    private MockWebServer server;

    private String mockUrl;

    @BeforeEach
    public void setup() throws IOException {
        server = new MockWebServer();
        server.start();

        mockUrl = "http://localhost:" + server.getPort();
        LOG.info("URL:{}", mockUrl);
    }

    /***
     * Scenario slow to response server, simulating handling pendingAcquireMaxCount
     */
    @Test
    public void testClientBackPressure() throws InterruptedException {
        /**
         * Make a connection pool with small queue to easily simulate scenario
         */
        ConnectionProvider.Builder builder = ConnectionProvider.builder("custom")
                .maxConnections(1)
                .pendingAcquireMaxCount(3);

        HttpClient client = HttpClient.create(builder.build());
        WebClient webClient = WebClient.builder()
                .clientConnector(new ReactorClientHttpConnector(client))
                .build();
        /**
         * Add some mock responses with slow response times
         */
        for (int i = 0; i < 10; i++) {
            server.enqueue(new MockResponse()
                    .setBody("Hello World")
                    .setHeader("Content-Type", "text/plain")
                    .setBodyDelay(1, TimeUnit.SECONDS));
        }

        for (int i = 0; i < 10; i++) {
            Mono<String> result = getResult(webClient);

            result.subscribe(x -> LOG.info("Response: {}", x));
        }

        Thread.sleep(10000l);
    }

    public Mono<String> getResult(WebClient webClient) {
        Mono<String> result = webClient.get()
                .uri(mockUrl)
                .retrieve()
                .bodyToMono(String.class)
                .doOnError(WebClientRequestException.class, error -> {
                    if (error.getMostSpecificCause() instanceof PoolAcquirePendingLimitException) {
                        LOG.error("Reached max number of PEDNING connections stop sending!");
                    }
                });

        return result;
    }
}

My desired behavior is for as soon I get the PoolAcquirePendingLimitException for the for loop above to immediately break.

Igor Artamonov
@splix
Can someone help me with Reactor context? I have an ungly but working code, as described at https://stackoverflow.com/questions/74354786/propagate-reactor-context-back, but want to find a correct way to propagate a context from an innner publisher. Is that possible?
6 replies
Burak Akça
@burakakca
Is the reactor supports IPC?
StevenCurran
@StevenCurran
Hey all, trying to implement something from the TCPServer, which will receive a large payload (custom string, potentially XML), do some processing, and reply back on the outbound channel based on some data from the inbound. However, as it is chunked, I cannot seem to correctly concat the inbound data together. I have a small gist example here, where I just want to take the large incoming payload, and convert it to upper case, but without it being chunked. So the flatmap op would occur once only.
https://gist.github.com/StevenCurran/c486306b55187064a9d7f066d15d797e
Checking the example, and in the tests on the repo, most of the examples just pass back some Mono.just("string"), but I need the large message from the inbound channel in its entirety. I have tried with inbound.receive().asString().reduce(String::concat) but with no luck. I believe I need something like the JsonObjectDecoder / ChannelInboundHandlerAdapter, but struggling to find any examples of its use in reactor
27 replies
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                    //inbound.connection.addHandlerLast() Is there a combiner I can use here?
                         .handle((inbound, outbound) -> {

                         //Tried to concat here, but then the flatMap is never called.
                         //return inbound.receive().asString().reduce(String::concat)


                         return inbound.receive().asString().flatMap(s -> {
                         //this is chunked, but I need the S to be the complete string.
                            NettyOutbound out = outbound.sendString(Mono.just(s.toUpperCase()));
                            return out
                         })


                         .bindNow();

        server.onDispose()
              .block();
    }
}
Mico Piira
@micopiira

Hey all. The new automatic ThreadLocal propagation feature in Reactor Core 3.5.0 works only for 'handle' and 'tap' operators. I would like to propagate all thread locals that have been registered to the ContextRegistry from the thread that calls 'block' to all operators.

With these 2 separate hooks I think I got it working, but I would like to know if there is something wrong with this or if there is a better way to achieve this?

        Hooks.onEachOperator("propagate", Operators.liftPublisher((publisher, coreSubscriber) -> new CoreSubscriber<>() {
            @Override
            public Context currentContext() {
                return coreSubscriber.currentContext();
            }


            @Override
            public void onSubscribe(Subscription s) {
                try (ContextSnapshot.Scope ignored = ContextSnapshot.setAllThreadLocalsFrom(coreSubscriber.currentContext())) {
                    coreSubscriber.onSubscribe(s);
                }
            }

            @Override
            public void onNext(Object o) {
                try (ContextSnapshot.Scope ignored = ContextSnapshot.setAllThreadLocalsFrom(coreSubscriber.currentContext())) {
                    coreSubscriber.onNext(o);
                }
            }

            @Override
            public void onError(Throwable throwable) {
                coreSubscriber.onError(throwable);
            }

            @Override
            public void onComplete() {
                coreSubscriber.onComplete();
            }
        }));

        Hooks.onLastOperator("contextCapture", objectPublisher -> {
            if (objectPublisher instanceof Mono) {
                return Mono.from(objectPublisher).contextCapture();
            }
            if (objectPublisher instanceof Flux) {
                return Flux.from(objectPublisher).contextCapture();
            }
            return objectPublisher;
        });
2 replies
revganjesh
@revganjesh
Hi
I am looking for a scenario in iterating the incoming request and attach the remote service response back to it
  1. Iterate the Array of items in the incoming request
  2. send the item to remote service to fetch response
  3. update or attach the received response back to the corresponding item in the request
Igor Artamonov
@splix
Another silly question. How to fire a side event after a subscription to a Flux.fromPublisher(sink) is fully propagated to the sink itself? flux.doOnSubscribe() doesn’t help because it happens earlier.
The detailed question at https://stackoverflow.com/questions/74537433/fire-event-after-reactor-publisher-subscribed
1 reply
Ivan Vyazmitinov
@ivyazmitinov

Hello, dear Reactor community!

In light of the release of Project Loom and some claims that the reactive approach on JVM will soon be dead, I decided
to write up a detailed series of posts describing why it may be
not so certain. I have already published 3 out of 4 parts (it struck me to share it here very late) and hope you
will find it helpful. And, of course, any feedback is appreciated!

HaloFour
@HaloFour
I think it depends on how you use Reactor. If you're primarily using Mono<T> as futures to compose I/O-bound operations in, say, a Spring WebFlux service, then yeah, I think Loom will make a pretty big dent. More complex flows I think will retain the advantage of using Reactor, to the extent that it would be like an async version of Java Streams.
1 reply
Igor Artamonov
@splix
I think the main problem that cause people to avoid reactor is inability to get good traces and debug in general. But maybe it can be fixed? Right now it all based on JVM stack traces that are useless in such env. But maybe Reactor can use a special context to keep a reactor-trace of the execution? i.e. it doesn’t stop with Mono.error(Throwable), but with a special type of an event that unrolls the execution context and provide a new type of exception. Kind of what JVM does to build a stack trace, but with all the reactor specific instead? I know there is a .checkpoint but for a some reason it’s not very helpful. Maybe because it moves the responsibility to the developer.
11 replies
HaloFour
@HaloFour
I think that impacts operational use of Reactor, but I think the bigger issue (by orders of magnitude) is that reactive programming is extremely different and much more difficult than imperative programming in vanilla Java. Kotlin, with coroutines, can smooth that out, but you still end up with the "color function" problem.
25 replies
Oleh Dokuka
@OlegDokuka
telegram-cloud-photo-size-2-5474206818759721509-y.jpg
Igor Artamonov
@splix
I had a question above, but maybe now is the better time to ask it again. What is the best approach to do something after being fully subscribed to a Sink? That do something causes a response from that sink and sometimes too fase. So if I use .doOnSubscribe it get it back through the sink almost immediatelly sometimes before it fully subscribed. So I loose the reponse. And the thread that send to the sink shows error with “no subscriber”.
Mario
@mmaryo
Hello I look for make a process only on the first element of a Flux
Is there a method for that ?
The only way I found is to do like
            Flux.create { emitter ->
                var firstTime = true
                // create a stream
                createStream().map { 
                     if (firstTime) {
                         // ....
                         firstTime = false
                         emitter.next(...)
                         emitter.complete()
                      } else {
                          // ....
                      }
                }
8 replies
Philipp Paul
@philurlaub

Hi, I want to replay the last element of a Flux and keep the full sequence. Currently I use:

        var f = Flux.range(0, 10);
        var res = Flux.concat(
            f,
            f.last()
        );

which seems to work in simple examples and in my tests, but if I run it in an endpoint retrieving Flux<byte[]> I get NoSuchElementException. Is this in general a bad idea?

8 replies
Subhalakshmi Krishnasamy
@Subhalakshmi1986
Where can I find best practices guide for reactor usage ?
Violeta Georgieva
@violetagg
@Subhalakshmi1986 Did you check https://projectreactor.io/learn
Amudhan
@amudhangunasekaran

Hi, suppose I have a Sinks.many().unicast().onBackpressureError() and created a Flux out of it and using it as a producer. Now, at a later point of time, I want this first sink to receive messages from another Sink. That is whenever the new/second sink emits messages, this first sink should also emit the same message. Is it possible to achieve this? (If Sinks.many().unicast() is a bad idea and multicast() or replay() would work, I am fine with changing to that too)

Full usecase:
Creating a WebSocket chat server and when a session is created, I don't know to which all chat group this session will subscribe to and receive the messages. The chat groups itself can be created after a WebSocket session is created.

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        Sinks.Many<String> unicast = Sinks.many().unicast().onBackpressureError();
        Mono<Void> receiver =
                session
                    .receive()
                    //process incoming messages
                    .then();

        Mono<Void> sender =
                session
                    .send(unicast.asFlux().map(session::textMessage))
                    .doFinally(this::cleanUp);

        return Mono.zip(receiver, sender).then();
    }

If this session later sends a message to join another group (for which I have a Sinks.many().replay().limit(10) and would emit from this sink whenever message is received on this group), how can I connect the original sink with this one so that when the second sink emits, the first sink emits the message to the session?