Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • 10:08
    mp911de commented #138
  • 10:05

    mp911de on perf-experiments

    Pre-encode static messages Introduce ExtendedFlowMessage t… (compare)

  • 07:10
    mp911de labeled #108
  • 07:10
    mp911de commented #108
  • Oct 22 15:10
    mp911de closed #43
  • Oct 22 15:10
    mp911de commented #43
  • Oct 22 15:10
    mp911de labeled #43
  • Oct 22 15:10
    mp911de unlabeled #43
  • Oct 22 15:07
    mp911de milestoned #181
  • Oct 22 15:07
    mp911de closed #181
  • Oct 22 15:07

    mp911de on master

    Polishing [#187] Support unix domain socket conn… (compare)

  • Oct 22 12:52
    mp911de closed #187
  • Oct 22 12:52

    mp911de on master

    Propagate SSL failures for SSL … (compare)

  • Oct 22 12:51
    mp911de edited #187
  • Oct 22 12:47
    mp911de labeled #187
  • Oct 22 12:47
    mp911de milestoned #187
  • Oct 22 12:47
    mp911de opened #187
  • Oct 22 12:38
    mp911de closed #186
  • Oct 22 12:38
    mp911de closed #184
  • Oct 22 12:38

    mp911de on master

    Properly implement InetAddressC… Enable BlobCodec and ClobCodec … Merge Codec integration tests … (compare)

Mark Paluch
@mp911de
Thank you Andreas. We wouldn’t be able to get here without all the support we received from the community.
Andreas Killaitis
@killaitis
Perhaps we should update this room's title to reflect and honour the big milestone… ;)
Mark Paluch
@mp911de
Good point.
Darren Boo
@darrenbkl

Hi, encounter this error when DB goes down

ERROR [tor-tcp-epoll-4] i.r.p.client.ReactorNettyClient          : Connection Error

io.netty.channel.unix.Errors$NativeIoException: syscall:read(..) failed: Connection reset by peer
        at io.netty.channel.unix.FileDescriptor.readAddress(..)(Unknown Source)

WARN [tor-tcp-epoll-4] i.n.c.AbstractChannelHandlerContext      : An exception 'reactor.core.Exceptions$ErrorCallbackNotImp
lemented: io.netty.channel.unix.Errors$NativeIoException: syscall:write(..) failed: Broken pipe' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught(
) method while handling the following exception:

io.netty.channel.unix.Errors$NativeIoException: syscall:read(..) failed: Connection reset by peer
        at io.netty.channel.unix.FileDescriptor.readAddress(..)(Unknown Source)

when db comes up, all query get stucks but no error
using r2dbc-postgresql and r2dbc-pool, any idea?

Mark Paluch
@mp911de
Can you file a ticket in the according driver project?
Darren Boo
@darrenbkl
ok
Mark Paluch
@mp911de
@mirromutth That one might be of interest to you: r2dbc/r2dbc-postgresql#175
With excluding netty-codec-http we're breaking other functionality, so we're going to revert that change in r2dbc-postgresql and r2dbc-mssql until Reactor Netty fixes their setup.
Mirro Mutth
@mirromutth
@mp911de Understood, thanks a lot.
In addition: does Reactor Netty has any plan for split artifact into modules?
Mark Paluch
@mp911de
Yes, they have
While you're here. I'm currently investigating failures during Result.map(…). I had two cases in which certain exceptions that happened in the mapping function were not propagated and the resulting stream in Result.map(…) gets stuck.
That's kind-of hard to reliably reproduce. In my case, I was able to reproduce it with Postgres where I issue another query while being within map(…) and then pkill -9 postgres
I need to investigate further what's going on, but I guess I will need some help from our Reactor folks
Mirro Mutth
@mirromutth
Have any test case/code to reproduce this?
Yeah, I means Postgres
Mirro Mutth
@mirromutth
Ah, I maybe understand. I will try reproduct it with MySQL after Oct 4.
Darren Boo
@darrenbkl
Btw, is it possible to use another pool with r2dbc? Like hikari
Mark Paluch
@mp911de
@darrenbkl No, Hikari is a blocking pool but maybe you want to leave a GitHub ticket asking for R2DBC support at https://github.com/brettwooldridge/HikariCP
@mirromutth No worries. I currently also do not understand what's happening and we need further investigation why error signals are not propagated by flatMap/concatMap
You can reproduce it with a locally running Postgres instance and the following (crazy) code:
ConnectionFactory connectionFactory = ConnectionFactories.get("r2dbc:postgres://postgres:foo@localhost/postgres");

int counter = 0;
while (true) {
    Connection connection = Mono.from(connectionFactory.create()).block();

    AtomicLong atomicLong = new AtomicLong();
    System.out.println(counter++);
    AtomicReference<Flux<Integer>> ref = new AtomicReference<>();
    Flux<Integer> integerFlux = Flux.from(connection.createStatement("SELECT * from pg_catalog.pg_type").execute()).flatMap(it -> it.map((row, rowMetadata) -> {

        try {
            if (atomicLong.incrementAndGet() % 10 == 0) {
            // break point here. Then pkill -9 postgres
                ref.get().subscribe();
            }

        } catch (RuntimeException e) {
            throw e;
        }
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return 1;
    }));

    ref.set(integerFlux);

    integerFlux.log("io.r2dbc.postgresql", Level.SEVERE).blockLast();


    Mono.from(connection.close()).subscribe();
}
Mark Paluch
@mp911de
R2DBC 0.8.0 RC2 is out now. Mostly addressing dependency issues in drivers, see https://r2dbc.io/2019/10/07/r2dbc-0-8-rc2-released
Erik Dreyer
@edreyer
I'm getting the following error using what I think are the latest libraries:
Caused by: java.lang.AbstractMethodError: Receiver class io.r2dbc.postgresql.PostgresqlConnection does not define or inherit an implementation of the resolved method abstract validate(Lio/r2dbc/spi/ValidationDepth;)Lorg/reactivestreams/Publisher; of interface io.r2dbc.spi.Connection.
    at io.r2dbc.pool.ConnectionPool.lambda$createConnectionPool$7(ConnectionPool.java:150) ~[r2dbc-pool-0.8.0.RC2.jar:0.8.0.RC2]
    at reactor.pool.SimplePool$QueuePooledRef.lambda$release$0(SimplePool.java:280) ~[reactor-pool-0.1.0.RELEASE.jar:0.1.0.RELEASE]
    ... 84 common frames omitted
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-r2dbc</artifactId>
            <version>1.0.0.RC1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/io.r2dbc/r2dbc-postgresql -->
        <dependency>
            <groupId>io.r2dbc</groupId>
            <artifactId>r2dbc-postgresql</artifactId>
            <version>1.0.0.M7</version>
        </dependency>
        <dependency>
            <groupId>io.r2dbc</groupId>
            <artifactId>r2dbc-pool</artifactId>
            <version>0.8.0.RC2</version>
        </dependency>
Mark Paluch
@mp911de
@edreyer 0.8.0.RC2 are the latest versions or R2DBC. Please upgrade the driver from 1.0 M7 to 0.8 RC2
Here's a post that explains the change in numbering: https://r2dbc.io/2019/05/13/r2dbc-0-8-milestone-8-released
Erik Dreyer
@edreyer
Very helpful. That did it. Thanks for the assist
Erik Dreyer
@edreyer

I'd love some help on the following. Let's say I have a Flux<Tweet> retrieved from the Twitter API.
There are multiple subscribers to this Flux. In the Flux pipeline, I've attempted to set it up so that
each Tweet is saved to the DB, but I'm getting errors due to multiple threads trying to save the same Tweet.
How can I solve this?

Here's how I set it up:

    public Flux<Tweet> tweets(@RequestParam String topic) {
        LOG.info("GET tweets: {}", topic);
        return twitterService.startFilter(topic)
            .flatMap(this::saveTweet);
    }

    @Transactional
    protected Mono<Tweet> saveTweet(Tweet tweet) {
        return twitterService.findByTwitterId(tweet.getTweetId())
            .switchIfEmpty(twitterService.saveTweet(tweet));
    }

I'm getting errors due to a violation of a unique index: duplicate key value violates unique constraint "tweet_tweetid_uindex"

to clarify, twitterService.startFilter(topic) will only create the flux on the first request. Subsequent requests get the existing one.
Mark Paluch
@mp911de
That sounds as if there were true duplicates. At the time findByTwitterId is running, there might be other transactions active that save the tweet. So basically this looks like a regular concurrency issue and receiving a key constraint violation error is the right thing to happen.
You probably might want to catch the error in that case and move on, so onErrorResume would be the appropriate operator.
Erik Dreyer
@edreyer

Thanks Mark. I understand that the find... call might run in a different thread and transaction than the save... call, but how does this work with R2DBC when trying to use @Transactitonal to try and put both those operations together? I was attempting to set a transactional boundary around the "If it's not in the DB then save it" logic.

I tried onErrorResume to attempt another findByTwitterId... operation and am receiving another issue.

I'm here at SpringOne in Austin. If you're here, would you be willing to spare a few minutes to take a look with me?

Mark Paluch
@mp911de
It's not about threads, it's about ordering. All activity is serialized and the same applies for transactions. You cannot operate concurrently on a transactional resource as the resource basically enforces serialization.
Still, when being in a transaction and you issue a SELECT on e.g. Postgres, you won't see a row if it is being inserted by another transaction as Postgres will use a previous snapshot to report there's no such row.
So just because you're using a transaction it doesn't necessarily mean that the SELECT gets delayed until another INSERT transaction is completed.
In any case, happy to meet @edreyer if you're still around. Drop me a DM/mail and we can arrange something around noon/lunch time.
Foxie
@WinteryFox
hi everyone, I just started using r2dbc and I'm getting this error when I'm trying to make a connection pool java.lang.UnsupportedOperationException: Reflective setAccessible(true) disabled
here's the snippet
private val pool = ConnectionPool(
            ConnectionPoolConfiguration.builder(
                    PostgresqlConnectionFactory(
                            PostgresqlConnectionConfiguration
                                    .builder()
                                    .host("postgresql://localhost")
                                    .port(5432)
                                    .username("postgres")
                                    .password("")
                                    .database("jmdict")
                                    .build()
                    )
            )
                    .maxSize(20)
                    .build())
Mark Paluch
@mp911de
A stack trace would be helpful
Foxie
@WinteryFox
yea it was most likely due to mismatching versions, I redid my pom and it works now
Andreas Killaitis
@killaitis

Today I replaced some older code of our app where we used databaseClient.execute("insert...") instead of databaseClient.insert() because of issue 119 which has since been closed, so I gave insert() a new try. Now I run into a strange behaviour when it comes to transactional error handling. While the code using execute("insert...") works fine and as intended, the code using insert() does not seem to handle exceptions correctly (e.g. duplicate key errors).

// depends on table foo: "CREATE TABLE foo (id INTEGER PRIMARY KEY)"

TransactionalOperator operator = TransactionalOperator.create(new R2dbcTransactionManager(connectionFactory));

// Version 1
Mono.empty()
        .then(databaseClient.execute("insert into foo (id) values(:id)").bind("id", 1).fetch().rowsUpdated())
        .then(databaseClient.execute("insert into foo (id) values(:id)").bind("id", 1).fetch().rowsUpdated())
        .as(operator::transactional)
        .onErrorResume(throwable -> Mono.just(42))
        .block();

// Version 2
Mono.empty()
        .then(databaseClient.insert().into("foo").value("id", 1).fetch().rowsUpdated())
        .then(databaseClient.insert().into("foo").value("id", 1).fetch().rowsUpdated())
        .as(operator::transactional)
        .onErrorResume(throwable -> Mono.just(42))
        .block();

Version 1 correctly catches the DUPLICATE KEY error and quietly resumes with 42.
Version 2 will log a lot of error messages:

2019-10-14 16:26:41.578 ERROR 42637 --- [actor-tcp-nio-7] reactor.core.publisher.Operators         : Operator called default onErrorDropped

io.r2dbc.postgresql.ExceptionFactory$PostgresqlDataIntegrityViolationException: duplicate key value violates unique constraint "foo_pkey"

...

2019-10-14 16:26:41.579  WARN 42637 --- [actor-tcp-nio-7] io.netty.util.ReferenceCountUtil         : Failed to release a message: PooledSlicedByteBuf(freed)

io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
    at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74)
    at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138)
    at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)

...

2019-10-14 16:26:41.584 ERROR 42637 --- [actor-tcp-nio-7] r.n.channel.ChannelOperationsHandler     : [id: 0x5688bc88, L:/127.0.0.1:56515 - R:localhost/127.0.0.1:32913] Error was received while reading the incoming data. The connection will be closed.

Any ideas anyone?

Mark Paluch
@mp911de
@killaitis looks like the IllegalReferenceCountException kills the connection state. Which version are you using?
Andreas Killaitis
@killaitis
Latest BUILD-SNAPSHOT
Mark Paluch
@mp911de
Thanks a lot. Can you file a ticket in R2DBC Postgres?
TC-Kyle
@TC-Kyle
Hey-o, does anybody know why my connection might be hanging? I'm using r2dbc-postgresql version 1.0.0.M7 and spring-data-r2dbc version 1.0.0.RC1. Whenever I make a query, the connection seems to just endlessly hang, without giving me a result or an error. Even simple queries like, "SELECT id FROM table" will hang.
Mark Paluch
@mp911de
Please upgrade to R2DBC 0.8.0.RC2.
TC-Kyle
@TC-Kyle
@mp911de Huzzah, that worked! Thanks for the help.
Also, I managed to catch you talk at SpringOne. Great job! :D
*your
Mark Paluch
@mp911de
Thanks a lot!