Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • 08:04
    mp911de commented #54
  • 07:55
    mp911de commented #54
  • 07:53
    mp911de closed #232
  • 07:53

    mp911de on 0.8.x

    Add synchronization to Indefini… (compare)

  • 07:48
    mp911de labeled #232
  • 07:48
    mp911de milestoned #232
  • 07:48
    mp911de opened #232
  • 07:46
    mp911de commented #223
  • 07:46

    mp911de on master

    Polishing Update documentation… (compare)

  • 05:58

    ttddyy on 0.8.x

    Avoid NPE when publisher operat… Update CHANGELOG (compare)

  • 05:58
    ttddyy closed #56
  • 05:58

    ttddyy on master

    Avoid NPE when publisher operat… Update CHANGELOG (cherry picke… (compare)

  • 05:42
    ttddyy milestoned #56
  • 05:42
    ttddyy labeled #56
  • 05:41
    ttddyy assigned #56
  • 05:41
    ttddyy opened #56
  • Jan 25 19:50
    ttddyy labeled #55
  • Jan 24 22:21
    Squiry commented on a120489
  • Jan 24 22:19
    Squiry commented on a120489
  • Jan 24 22:15
    Squiry commented on a120489
Anton Duyun
@Squiry
@mp911de we have delayUntil in StartupFlow.
It requests Integer's max value.
Also, windows blow our heap with Object[32] arrays.
Mark Paluch
@mp911de

I played in the context of r2dbc/r2dbc-postgresql#222 with tracking the demand

        AtomicLong demand = new AtomicLong();
        Mono<Void> receive = connection.inbound().receive()
            .doOnNext(ignore -> demand.decrementAndGet())
            .doOnRequest(demand::addAndGet)

and printed the demand in the previously available doOnNext(fluxOfFluxes)

The initial demand was 256 and after ~ 258 issued commands that use createStatement(…).execute().flatMap(result).next(), the demand went to 0
Sergei Egorov
@bsideup
keep in mind that windowWhile is not backpressure aware if I remember correctly
Mark Paluch
@mp911de
It kind-of worked
I observed that without cancelling, the global demand measured at NettyInbound kept incrementing one-by one.
Anton Duyun
@Squiry
@mp911de startup flow requests s.request(Long.MAX_VALUE);.
Mark Paluch
@mp911de
So after 256 regular commands the demand was ~ 500
Anton Duyun
@Squiry
FluxSink does have onRequest callback, so we can propagate them to NettyInbound.
Mark Paluch
@mp911de
windowWhile uses its prefetch value of 256 by default (https://github.com/reactor/reactor-core/blob/master/reactor-core/src/main/java/reactor/core/publisher/Flux.java#L9414). Also, FluxWindowPredicate uses some sort of request capping.
Anton Duyun
@Squiry
But when i run autoCommitByDefault test i get those requests:
9223372036854775807 (from delay until)
1
1
256
1
1
1
1
Mark Paluch
@mp911de
With the current code on master?
Anton Duyun
@Squiry
Yep. One second, I'll push it for you.
Mark Paluch
@mp911de
Yeah, the backpressure control on master is broken.
Give it a try with 0.8.0 (from the release) that uses windowWhile(…)
Anton Duyun
@Squiry
Yes and i'm trying to fix it. One moment.
Mark Paluch
@mp911de
That code looks decent. We should keep track of the downstream demand and the sink demand.
For request(Long.MAX_VALUE), we can either rewrite delayUntil to something else or handle that case in the subscriber.
Anton Duyun
@Squiry
Question: if this request(Long.MAX_VALUE) was translated to reactor-netty then how demand went 0?
Mark Paluch
@mp911de
Capping the request at 256 or so seems a good approach, but we also need to consider when a sink goes away or when it wants more, and if it wants more data, that we don't request a huge value, but rather request more as we see that more messages came in.
Let me try to reproduce the issue
Mark Paluch
@mp911de
Okay, so with 0.8.0.RELEASE I get
256
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
The trick that happens is that request(n) is intercepted by a subscriber of FluxWindowPredicate. WindowPredicateMain requests a batch from upstream which is s.request(Operators.unboundedOrPrefetch(prefetch)); and prefetch=256
Anton Duyun
@Squiry
Any flatmap wil cause request(256), yeah.
So we really can't track all those requests and demand on one point of time will be MAX_VALUE?
Mark Paluch
@mp911de
Hopefully not. I was looking for an approach where we can have a constant demand of 1 (or 256) throughout the time unless the receiver sink stops its demand (either because it is really busy with processing data or it sends a cancel()).
Anton Duyun
@Squiry
But request(256) for something like SELECT 1 (or any select without 253 rows) will grow demand.
So it should be constant 1?
Mark Paluch
@mp911de
What we would need is a a better window operator. One that keeps the demand constant and where the predicate can change based on the current response receiver/conversation.
I'm not sure which value (1 or 256) is better, but at least, it should stay constant.
If I run multiple queries (0.8.0.RELEASE) without calling cancel, then the demand indeed grows
doOnNext(fluxOfMessages): Current demand: 255
doOnNext(fluxOfMessages): Current demand: 256

… after ~ 300 queries
doOnNext(fluxOfMessages): Current demand: 552
doOnNext(fluxOfMessages): Current demand: 553
doOnNext(fluxOfMessages): Current demand: 554
doOnNext(fluxOfMessages): Current demand: 555
doOnNext(fluxOfMessages): Current demand: 556
doOnNext(fluxOfMessages): Current demand: 557
Anton Duyun
@Squiry
Subscriber-based approach in my branch looks almost nice. But I can't see any way to keep demand constant except of putting .limitRequest(1) and flatMap(a -> b, 1) everywhere.
Mark Paluch
@mp911de
No, we need to handle that in the operator along with some magic in sink.onRequest(…) and onCancel(…)
The first time we see a sink, we need to register with it that we get its demand/cancellation signal.
then we need to keep track how many messages we've seen (Subscriber.onNext(…)) and count that value against the accumulated demand.
Anton Duyun
@Squiry
Oh, i kind of have an idea.
Mark Paluch
@mp911de
I tried a similar approach yesterday but wasn't able to come up with a good calculation in my first attempt. Also, buffering of the overflow demand then totally smashed my thoughts and I wanted to reconsider my approach in a few days to come up with a better design.
Anton Duyun
@Squiry
@mp911de Squiry/r2dbc-postgresql@a120489 ugly as hell but I think it works!
Mark Paluch
@mp911de
I'll have a look tomorrow
Panusitt Khuenkham
@bamossza

Hi everyone, I am experiencing a problem that I encountered now is that the program closed the connection before the response data back.

Error: "Failed to obtain R2DBC Connection; nested exception is io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionException",

......

[WARN ] 2020-01-26 18:18:41 [reactor-tcp-epoll-3] FluxReceive.warn : [id: 0x93a0e581, L:/10.244.0.35:48462 ! R:sqlsvrdev01.database.windows.net/ip:1433] An exception has been observed post termination

reactor.core.Exceptions$BubblingException: io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionClosedException: Connection closed

......

[ERROR] 2020-01-26 18:18:41 [reactor-tcp-epoll-3] ChannelOperationsHandler.error : [id: 0x93a0e581, L:/10.244.0.35:48462 ! R:sqlsvrdev01.database.windows.net/ip:1433] Error was received while reading the incoming data. The connection will be closed.

......

application.yml
spring:
  r2dbc:
    username: sa
    password: zynWgTxxxx
    url: r2dbc:pool:mssql://qlsvrdev01.database.windows.net:1433/dev01
    pool:
      max-idle-time: 15
      initial-size: 10
      max-size: 100
      validation-query: select 1
      enabled: true
    sql-script-encoding: UTF-8

ENV
Spring Boot 2.2.2.RELEASE
io.r2dbc:r2dbc-mssql:0.8.0.RELEASE
io.r2dbc:r2dbc-pool:0.8.0.RELEASE
io.r2dbc:r2dbc-spi:0.8.0.RELEASE

The problem as mentioned, can you suggest a solution?

Thank you so much.

I resolved.
I think r2dbc still doesn't support some versions of Microsoft SQL Server.
Azure DevOps
Mark Paluch
@mp911de
R2DBC MSSQL generally works on Azure SQL. We've seen some reports regarding failover/routing issues but we need some support to make failover for SQL Server work.