Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Knut Schleßelmann
@kschlesselmann
Hi! anyone an idea how I can find out why this code never completes?
override fun getAttributeInformation(attribute: Attribute): Mono<AttributeInformation> {
    val input = data()
            .doOnSubscribe { logger.warn { attribute.id } }
            .doOnNext { logger.info { "ID: ${it.id}" } }
            .publish()
            .autoConnect(2)

    val date = input
            .doOnNext { logger.info { "ID: ${it.id}" } }
            .map { it.lastModifiedDate }
            .max()
            .doOnNext { logger.info { "date is $it" } }

    val relatedInformation = input
            .doOnNext { logger.info { "ID: ${it.id}" } }
            .flatMapIterable { it.toPairs(attribute.id) }
            .groupBy({ it.first.value }) { it.second }
            .flatMap { g -> g.collect(Collectors.toSet()).map { g.key()!! to it } }
            .collectMap({ it.first }) { it.second }
            .map { RelatedInformation(byValue = it) }
            .doOnNext { logger.info { "related is done" } }

    return Mono.zip(date, relatedInformation)
            .map { AttributeInformation(attributeId = attribute.id, lastModifiedDate = it.t1, related = it.t2) }
            .doOnSuccess { logger.warn { attribute.id } }
            .doOnError { logger.error(it) { attribute.id } }
The only i/o bound process is data() and I get a lot of IDs in my log that seem not to be consumed by date or relatedInformation
Michael Beknazarov
@Michael17342257_twitter
Hi guys!
I have the following issue - to invoke a web service, I need to get some params from Mongo. I don't want to use blocking IO and use subscribe. But there is a problem to get the value from subscribe. Could anyone tell me a good solution how it's possible to get value using subscribe method?
Gautier DI FOLCO
@blackheaven
Hi all,
Is there a way to cache a Mono, only after 30s for example?
Antriksh vijay
@vijay_antriksh_twitter
Hi, I am new to Project Reactor and would really like to understand how threading model works in reactor-netty?
I understand that one event loop is one thread which are assigned channels. But once the event is dispatched through the Channel how eventloop returns back to process more requests? How are 4 threads (eventloops) able to handle so many requests?
Anil Gursel
@anilgursel

How can I make sure to clean up the resources (InputStream and ByteBuf) if a TimeoutException is thrown in this scenario?

@Test
public void testReleasingResources() throws InterruptedException {


    final String result =
            HttpClient.create(ConnectionProvider.fixed("dummy", 10))
                .protocol(HttpProtocol.HTTP11)
                .keepAlive(true)
                .request(HttpMethod.GET)
                .uri("http://localhost:" + mockServer.port() + "/hello")
                .responseSingle((response, bytes) -> bytes
                        .asInputStream()
                        .map(is -> new SomeClass(is))
                        .flatMap(sc -> slowMethod(sc))
                )
                .timeout(Duration.ofMillis(500))
                .map(someClass -> convertToString(someClass))
                .block();

    Assert.assertEquals("Hello World!", result);
}

block etc is used just to simplify the test.

@violetagg @simonbasle We discussed about this during SpringOne Platform and you suggested using doOnDiscard and provide Consumer for specific types, e.g., InputStream, ByteBuf or SomeClass. I added bunch of doOnDiscard for different types, but, I do not see doOnDiscard being called at all. I assume this is because there is no element in the buffer of any operator? Timeout happens while element is being processed in one of the operators?

I will very much appreciate any suggestion here.

You can find the full dummy test code here https://gist.github.com/anilgursel/301efbac56f1a6755956da6f63d8a0df

tofflos
@tofflos
I'm looking for something that will turn a list of forecasted temperatures into a stream of time-released events/marbles. Let's say I have a map of 07:00->10 degrees, 10:00->12 degrees, 11:00->14 degrees, ... Is there some convenience functionality that I can use together with combineLatest to calculate the quality of the forecast as the measurements come in?
AnonyCat
@fjacobs
@tofflos what exactly is in the other stream that you are combining with the forecasts stream?
or is it just a calculation that you want to run on the forecast stream
tofflos
@tofflos
Measured temperatures from a sensor. So for this example I would be interested in a "quality" stream that is calculated as forecasted - measured.
Volkan Yazıcı
@vy
Hello! Why doesn't Flux#thenReturn exist while Mono#thenReturn does?
Daniil Mikhaylov
@mdsina
Hi!
It is possible to use same elastic thread on several operations?
I have multiple blocking operations and want that they will be performed on single thread, because some ThreadLocal issues
Thanks
AnonyCat
@fjacobs
@tofflos maybe you could use cache on the end of the forecasts flux and use concat after it instead of combine
then the measured temperature flux determines the signal to downstream while automatically combining with the latest known forecast
AnonyCat
@fjacobs
Be aware that combineLatest will not emit an initial value until each observable emits at least one value. This is the same behavior as withLatestFrom and can be a gotcha as there will be no output and no error but one (or more) of your inner observables is likely not functioning as intended, or a subscription is late.
AnonyCat
@fjacobs
disclaimer: I am learning too
Simon Baslé
@simonbasle
@anilgursel might it be that the InputStream is already emitted to the map by the time timeout occurs? doOnDiscard's main intent is that the operators that buffer elements as part of their natural operation will pass these to the consumer(s) IF they haven't emitted said elements
so maybe that's not what you really need? I would expect that in your case it would be SomeClass that would be passed to the consumer. flatMap is supposed to apply doOnDiscard eg. if it held an element for backpressure purpose
Anil Gursel
@anilgursel
@simonbasle I had also added doOnDiscard for SomeClass, which did not help neither. When timeout is triggered, the element is being processed inside one of the operators, it is not in buffer (and not back pressured). What is the best way to deal with this? Do we need to add a try/catch inside all such operators? I was looking for a hook (like doOnDiscard, doOnError) which would be called when an error/cancellation happens, but also allows me to access the resource to clean up.
Simon Baslé
@simonbasle
yeah, at this point there's nothing reactor can really do to propagate the cancel signal to an arbitrary block of code in an arbitrary operator :( what if the cancellation happens right in the middle of your mapping function for instance? even the "onCancel Consumer" style wouldn't help there, because it would "change the state" of your object while your code is still using it.
best bet is probably to just try/catch if that is even a possibility
Anil Gursel
@anilgursel

what if the cancellation happens right in the middle of your mapping function for instance?

When an exception happens and the stream is canceled, wouldn’t it cancel the execution on all operators? Is there any possibility that the stream ends with error and that operator keeps running?

If not, then having a doOnEror which provides access to the element would help, IMO. At that point, one can check the state and determine what to do.

Simon Baslé
@simonbasle
a stream is not "cancelled" as in thread interruption... if the cancel occurs while eg. a synchronous map is applying its Function, this doesn't interrupt the function (there's no way to do that).
similarly, your idea of a doOnError with "access to the element" doesn't work, because there isn't necessarily an element (and there's no way for operators to communicate state to other operators, outside of onNext/onError)
doOnDiscard is using the Context to approximate that
Anil Gursel
@anilgursel
would doOnDiscard, doFinally, doOnError wait for the map to finish running the Function, though?
in other words, is there any guarantee in the order of executions?
Simon Baslé
@simonbasle
doOnDiscard doesn't have any interest in onNext|onComplete|onError|cancel signals. it just puts your Consumer(s) in the Context under a key that each supporting operators has to explicitly look for in their implementations. not 100% of operators support it (because it only makes sense for a subset of operators to pay that overhead)
have you looked at using ? I wonder if it could fit the bill here
scoping the InputStream/ SomeClass conversion and its usage, and allowing to call close() on it when the downstream cancels
Anil Gursel
@anilgursel
I will check using in more detail, but from a quick glance, it looked to me for being used for the resources that are created outside of the stream. In my case, the resource is created inside the stream.. the resource will potentially continue to exist outside of the stream in happy path, but I would like to make sure it gets cleaned up in case of an exception/cancellation.
Sunil Yadav
@aiod-sunil
HI All ,

I am getting this error java.lang.ClassCastException: reactor.core.publisher.FluxMapFuseable incompatible with java.lang.String

do you have any idea

at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.invoke(KotlinLambdaToFunctionAutoConfiguration.java:147) ~[spring-cloud-function-kotlin-3.0.0.M3.jar:3.0.0.M3]
Sergei Egorov
@bsideup
@aiod-sunil it seems that you're using spring-cloud-function-kotlin-3.0.0.M3, have you tried the release?
Sunil Yadav
@aiod-sunil

plugins {
id("org.springframework.boot") version "2.2.0.RELEASE"
id("io.spring.dependency-management") version "1.0.8.RELEASE"
kotlin("jvm") version "1.3.50"
kotlin("plugin.spring") version "1.3.50"
}

group = "com.aiod"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_1_8

repositories {
mavenCentral()
maven { url = uri("https://repo.spring.io/milestone") }
}

val springCloudVersion = "Hoxton.M3"

let me try release version
Sunil Yadav
@aiod-sunil
@bsideup i changed to Greenwich.SR3

Execution failed for task ':compileKotlin'.

Could not resolve all files for configuration ':compileClasspath'.
Could not find org.springframework.cloud:spring-cloud-function-kotlin:.
Required by:
project :

i donot see kotlin function dependencies
Sergei Egorov
@bsideup
oh, my bad, it seems that Hoxton train is not released yet. Anyways, consider asking help in Spring Cloud's community, the error you get has nothing to do with Reactor, actually
Sunil Yadav
@aiod-sunil
alright thank you so much @bsideup
Sergei Egorov
@bsideup
No problem! I hope they will help you figuring out the problem 👍
Olan Byrne
@olanb7

Hey all - I'm building out a webflux app and trying to push/evangelize it to the wider org, but there's some 'simple' things that I can't help feel I'm not doing right. The main one is 'domain' validation - i.e. throwing an error from a service class. Currently I'm doing something like this:

Mono<Boolean> isValidFuture = ContextUtils.getUserJwt()
                .map(JwtUtils::getUserId)
                .zipWith(itemFuture, validItemUser())
                .filter(Boolean::booleanValue)
                .switchIfEmpty(Mono.error(new RuntimeException("User can't act on this item")));

I then have to wire this isValidFuture in with the code that should do something after validation has been performed. Is there a simpler way/more idiomatic to achieve this?

Olan Byrne
@olanb7

i.e. I end up doing something like:

 return isValidFuture.flatMap(x -> Mono.zip(itemUpdateFuture, statusUpdateFuture))
                                          .map(tuple2 -> new NiceResponseObject(tuple2.getT2()));

it's this bit ^ that I feel is suboptimal, but can't think of a way to have the framework subscribe to the isValidFuture any other way.

novemberswimmer
@novemberswimmer
Hi All, i just joined this room and just started learning spring reactor. I have a ReactiveCassandraRepository and is using findAll to get all records back in my repo. From what i read so far thought i always need to call subscribe() on flux before i can get the data. my question is how come firing the findAll() method on ReactiveCassandraRepository i get all the data even without subscribe()?