Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
    Bert
    @HBO2

    the strange thing is: I am not longer to connect to the pod with Landoop connector:

    Kafka Connect : http://10.1.9.254:8083 Kafka Connect UI Version : 0.9.7 CONNECTIVITY ERROR

    Jiri Pechanec
    @jpechane
    @HBO2 Well, maybe th problem is that it also records the data into topics in compressed form so the consumer cannot read them? And in this case we are tlkaing about status and offsets topic too?
    Bert
    @HBO2
    @jpechane yes
    AJAY CHOWDHARY
    @AjayChowdhary
    curl -XPOST 'localhost:8083/connectors' \
    --header 'Content-Type: application/json' \
    --data-raw '{
        "name": "payment-connectors-data-migration-source-4",
        "config": {
            "connector.class": "io.debezium.connector.mysql.MySqlConnector",
            "tasks.max": "1",
            "database.history.kafka.topic": "dbhistory.payments",
            "transforms": "unwrap",
            "include.schema.changes": "false",
            "table.whitelist": "transactions.payment",
            "decimal.handling.mode": "string",
            "database.history.kafka.recovery.poll.interval.ms": "1000",
            "transforms.unwrap.drop.tombstones": "false",
            "database.history.skip.unparseable.ddl": "true",
            "converters": "boolean,utcTimestampConverter",
            "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
            "errors.log.enable": "true",
            "database.user": "ixigo",
            "database.server.id": "111141",
            "database.history.kafka.bootstrap.servers": "localhost:9092",
            "boolean.type": "io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter",
            "database.server.name": "dbserver1",
            "database.port": "3306",
            "database.hostname": "",
            "database.password": "",
            "database.history.store.only.monitored.tables.ddl": "true",
            "key.converter.schema.registry.url": "",
            "key.converter": "org.apache.kafka.connect.json.JsonConverter",
            "key.converter.schemas.enable": "false",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable": "false",
            "utcTimestampConverter.type": "snapp.kafka.connect.util.UTCTimestampConverter"
        }
    }'
    3 replies
    I have added a custom converter utcTimestampConverter but the conversion is not taking place
    am I missing something in config ? I have added the jar file in debezium-connector-mysql folder
    public class UTCTimestampConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
    
        private static final String UTC_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'";
        private static final String LOCAL_FORMAT = "yyyy-MM-dd'T'HH:mm:ss";
        private SimpleDateFormat utcFormatter, localFormatter;
    //    private List<String> timeFields;
        @Override
        public void configure(Properties props) {
    
            this.utcFormatter = new SimpleDateFormat(UTC_FORMAT);
            this.localFormatter = new SimpleDateFormat(LOCAL_FORMAT);
            this.utcFormatter.setTimeZone(TimeZone.getTimeZone("UTC"));
            this.localFormatter.setTimeZone(TimeZone.getDefault());
    //        this.timeFields = Arrays.asList(props.getProperty("fields").split(","));
    
        }
    
        @Override
        public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {
    //        log.error("schema for" + column.name() + " , " + column.typeName());
            System.out.println("schema for" + column.name() + " , " + column.typeName());
    
            boolean isTimeField = timeFields.stream().anyMatch(t -> t.equals(column.name()));
            if (column.name().equals("created_date")) {
                registration.register(SchemaBuilder.string().optional(), value -> {
    
                    String localTimestampStr = "";
    
                    if (value == null){
                        if (column.isOptional()){
                            return null;
                        }
                        else if (column.hasDefaultValue()) {
                            return column.defaultValue();
                        }
                        else
                            return localTimestampStr;
                    }
    
                    try {
                        Date utcDate = this.utcFormatter.parse(value.toString());
                        localTimestampStr = this.localFormatter.format(utcDate);
                    } catch (ParseException e) {
                        System.out.println("Exception :" + e);
                    }
    
                    return localTimestampStr;
                });
            }
        }
    }
    WilsonWsAuYeung
    @WilsonWsAuYeung

    Hey all.
    Im trying to add a new table to my existing connector.
    (table.whitelist for i am on version 1.2)
    table.whitelist: public.table1, public.table2
    ->
    table.whitelist: public.table1, public.table2, public.table3

    I switched snapshot.mode back and forth from never to exported just to try it out.
    I see no publication tables being made in the db and im sort of stuck on where to debug.
    I tailed ALL the logs, and theres no indication of errors in the status or logs.
    Is adding a new table even possible?

    7 replies
    lyq1198853167
    @lyq1198853167
    hey man, i need help.
    bhushan
    @bhush558_twitter
    Hi, I would like to configure connector in Strimzi Kafka to ship data from Kafka to Postgresql. Please share any example
    lyq1198853167
    @lyq1198853167
    when i create a container by docker for postgresql 9.6, when i use siddhi to capture the change data in pg(cdc), but the code can not get connnet, however the navicate can connet it, so i make a new code by jdbc to connect the pg in docker, sure the new code run successfully.
    the siddhi cdc code give me some EXCEPTION :
    Caused by: org.postgresql.util.PSQLException: FATA: no pg_hba.conf entry for replication connection from host "10.0.2.115", user "postgres", SSL off ,but i config the pg_hba.conf by
    host all all all trust
    Bert
    @HBO2
    Hello @jpechane Many thx for your help yesterday. At end we solved it by finding the record with the problem. There was a PNG in an text field pasted with the size of 0,5 mb. I assume the LENGHT function of mysql is making the size even bigger then 0,5 mb.
    Jiri Pechanec
    @jpechane
    @HBO2 Hi, thanks for letting us know. Sometimes one would not believe what data can be passed at the input
    Jacky Kwok
    @JackyKwok11_twitter

    I got below message with
    org.apache.kafka.connect.errors.ConnectException: The connector is trying to read binlog starting at GTIDs 6fd3bde1-6cb2-31bf-b1b8-5798cf0fa26f:1-43559799,eacc697a-9d75-3c32-9a19-8c10dcd674bf:1-2 and binlog file 'mysql-bin-changelog.000203', pos=34190196, skipping 84 events plus 3 rows, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.

    debezium version: 1.0
    GTID enabled
    config:

            "connector.class": "io.debezium.connector.mysql.MySqlConnector",
            "producer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor",
            "database.history.kafka.topic": "schema-changes",
            "transforms": "unwrap",
            "error.deadletterqueue.context.headers.enable": "true",
            "internal.key.converter.schemas.enable": "false",
            "include.schema.changes": "true",
            "table.whitelist": "product,brand",
            "decimal.handling.mode": "double",
            "database.history.skip.unparseable.ddl": "true",
            "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
            "table.ignore.builtin": "true",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "database.whitelist": "xxxx",
            "key.converter": "io.confluent.connect.avro.AvroConverter",
            "database.user": "kafka_connect",
            "database.server.id": "201614",
            "database.history.kafka.bootstrap.servers": "xxxx",
            "database.server.name": "xxx",
            "database.port": "3306",
            "enable.time.adjuster": "false",
            "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schema.registry.url": "xxxx",
            "database.ssl.mode": "disabled",
            "database.serverTimezone": "UTC",
            "database.hostname": "xxxx",
            "database.password": "xxxx",
            "internal.value.converter.schemas.enable": "false",
            "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "name": "xxxxx",
            "database.history.store.only.monitored.tables.ddl": "true",
            "key.converter.schema.registry.url": "xxxxx",
            "snapshot.mode": "initial"

    the connector encounter this error stop a while ago. However, the log show it was reconnected successful:

    INFO Connected to MySQL binlog at rds.amazonaws.com:3306, starting at GTIDs 6fd3bde1-6cb2-31bf-b1b8-5798cf0fa26f:1-42761798,eacc697a-9d75-3c32-9a19-8c10dcd674bf:1-2 and binlog file 'mysql-bin-changelog.000199', pos=57062658, skipping 1 events plus 0 rows (io.debezium.connector.mysql.BinlogReader)

    I have compared with AWS RDS(MySQL) the binlog is not available now.
    Q1. Why this is happening?

    Jacky Kwok
    @JackyKwok11_twitter
    Q2. How to prevent this happen again?
    Q3. How to fix this now?
    Thx for your attention
    Jiri Pechanec
    @jpechane
    @JackyKwok11_twitter Hi, Q1 and Q2 - Debezium was probably down for a while and databse has flushed the binary logs so Debezium cannot resum from the last position seen
    @JackyKwok11_twitter Q3 - either Reconfigure the connector to use a snapshot when needed
    thejames97
    @thejames97
    I'm evaluating Debezium to handle CDC for a Postgresql DB and my solution requires transaction boundary markers. I see a reference to it in this blog post: https://debezium.io/blog/2020/02/11/debezium-1-1-beta1-released/ ...but I can't find any further information on it...like how to turn it on. Any guidance will be much appreciated.
    3 replies
    Jacky Kwok
    @JackyKwok11_twitter

    @JackyKwok11_twitter Q3 - either Reconfigure the connector to use a snapshot when needed

    @jpechane if the snapshot.mode change to "when_needed" then the data will be duplicated because taking the snapshot again?

    1 reply
    Gandhi0210
    @Gandhi0210
    @Gandhi0210
    I am getting following error when using Debezium oracle logminer connector
    ERROR Mining session stopped due to the {} (io.debezium.connector.oracle.logminer.LogMinerHelper:517)
    java.lang.RuntimeException: Supplemental logging was not set. Use command: ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS
    ERROR Producer failure (io.debezium.pipeline.ErrorHandler:31)
    java.lang.RuntimeException: Supplemental logging was not set. Use command: ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS
    but supplemental logging is already turned on
    1 reply
    image.png
    bhushan
    @bhush558_twitter
    Hi, Is there any way I can get JDBC sinc connector for postgres in Strimzi Kafka. Does Debexium provides it
    2 replies
    tonysun0123
    @tonysun0123
    Hello Debezium Developer! We need your help again! How should we write unit test code for the application integrating the embedded Debezium? I found it is not easy, especially for the part of code that is only triggered by the database change. Please give us some help, thanks!
    1 reply
    Pooja Kashyap
    @PoojaKa65094558_twitter
    Hi Team
    Hi Team Greetings of the day! one of my client looking out for debezium tool for 10 users please help me out to get the commercials
    1 reply
    Cory Forward
    @c4wrd
    Is there a supported way to perform an initial snapshot of only specific tables? From what I understand my options now are to (1) change the kafka connect app name, giving new offsets and treating it as if it were a new Debezium connector being deployed (any potential side affects if I don't remove any of the existing Debezium schema history topics?) or (2) manually delete the offsets for my existing connector task and re-deploy the connector... either way it's going to do an initial snapshot for every table right?
    1 reply
    hunkeelin
    @hunkeelin
    This message was deleted
    lyq1198853167
    @lyq1198853167
    hi, when i use debezium connecttor to capture the change data by UPDATE、DELETE,how can i config, there is message:
    [debezium-postgresconnector-10.0.5.173_5432-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresChangeRecordEmitter - Different column count 0 present in the server message as schema in memory contains 2; refreshing table schema
    [debezium-postgresconnector-10.0.5.173_5432-change-event-source-coordinator] WARN io.debezium.connector.postgresql.PostgresChangeRecordEmitter - no new values found for table '{ key : null, value : {"name" : "_0.0.5.173_5432.public.person.Value", "type" : "STRUCT", "optional" : "true", "fields" : [{ ....... table=person,txId=604,lsn=22139760}'; skipping record
    lyq1198853167
    @lyq1198853167
    this is postgresql cdc
    lyq1198853167
    @lyq1198853167
    why there is no new value found for table, but i really uodate a new value to the row
    1 reply
    actually UPDATE and DELETE show capture the old value
    mark teehan
    @markteehan

    Hello! experiencing a consistent problem: SQLServers will not do an initial snapshot so no topics are created; and streaming never starts. The log entry always has "sourceTime=null". The same databases did initial snapshots successfully 2/3 months ago; now attempting to redo the snapshots for all systems. Is "sourceTime=null" an indicator of a problem?

    message:Snapshot ended with SnapshotResult [status=COMPLETED, offset=SqlServerOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.sqlserver.Source:STRUCT}, sourceInfo=SourceInfo [serverName=xxxOLTP6, changeLsn=NULL, commitLsn=00000045:0000e928:0001, eventSerialNo=null, snapshot=FALSE, sourceTime=null], partition={server=xxxOLTP6}, snapshotCompleted=true, eventSerialNo=1]]

    1 reply
    Assaf Liebstein
    @liebstein
    Hey everybody.
    Can anyone can retart the docker image build of 1.4.0.final?
    https://travis-ci.org/github/debezium/docker-images/builds/753331077
    1 reply
    ant0nk
    @ant0nk
    Hello, How to find out total list of supported datatypes? For example I added RAW type to Oracle table and it was successfully replicated as BYTES, but there is no RAW type in documentation of the connector.
    4 replies
    Chuong Nguyen
    @ChuongTNguyen_twitter

    Hello, I am getting the Socket is closed. error that is essentially in a never ending loop. The error is referenced here but the context is different: https://gitter.im/debezium/user?at=5fad1ee1b86f64070447c7a3

    AWS RDS Postgres 11
    1 connector
    pgoutput
    Debezium version 1.4.0.Final

    After reading the messages referencing the issue, I see that the cause is some networking/cpu spike or high IOPs usage. In my case, I see it with high IOPs usage (>10k) which leads to high networking volume. CPU usage is around 30% during these times.

    When this error happens, the connector will auto-reconnect thanks to (v1.1.0.CR1). However, after it restarts, it needs to replay the LSNs starting from the restart_lsn? (if i'm not mistaken) While this is happening the read IOPs used by the connector is around 10k and the Socket error happens almost every 20 minutes. By this time, it has not fully caught up to the confirmed LSN and the restart_lsn has not moved forward. It auto-reconnects, and restarts from the beginning... This loops keeps happening and we will never be able to catch up. My only resolution right now is to drop the slot. This is not ideal.

    Note that during the time connect fails initially, there are many write IOPs from other sources. This is why it might fall behind while restarting. However, when we terminate the jobs from other sources that writes to the database, we see that 10k read IOPs is used while connect is trying to catch up and this failure loops happens. I have confirmed that the read IOPs are associated with the connector. We have 16k provisioned IOPs.

    If we cannot solve the Socket is closed error, is there a way to have connect get out of this loop?

    1 reply
    Mohammad Mirzaeyan
    @mmirzaeyan
    @Naros
    ant0nk
    @ant0nk
    Hello, how to properly hide password in the config? I tried to use "database.password" : "${file:/data/connector.properties:dbPassword}" inside of json handed to Connect like this curl -X PUT http://localhost:8083/connectors/inventory-connector/config -H "Content-Type: application/json" -d @register-oracle-update.json but it does not work - invalid password in the log of Connect.
    1 reply
    Martin Perez
    @mpermar_gitlab

    Hello folks.

    Is it fair to say that in the Oracle Connector, Flashback capabilities are only needed to perform the initial snapshot?

    2 replies
    tims0n777
    @tims0n777

    Hello! After connect to Oracle 12.1, connector sends to topic ddl, but then stop with error. What could be wrong?

    2021-01-18 07:37:25,786] TRACE Class 'oracle.streams.XStreamOut' not found. Delegating to parent (org.apache.kafka.connect.runtime.isolation.PluginClassLoader:100)
    [2021-01-18 07:37:25,823] ERROR Producer failure (io.debezium.pipeline.ErrorHandler:31)
    oracle.streams.StreamsException: ORA-21560: argument last_position is null, invalid, or out of range

    at oracle.streams.XStreamOut.XStreamOutAttachNative(Native Method)
    at oracle.streams.XStreamOut.attachInternal(XStreamOut.java:373)
    at oracle.streams.XStreamOut.attach(XStreamOut.java:343)
    at io.debezium.connector.oracle.xstream.XstreamStreamingChangeEventSource.execute(XstreamStreamingChangeEventSource.java:70)
    at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:140)
    at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:113)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

    [2021-01-18 07:37:25,834] INFO Finished streaming (io.debezium.pipeline.ChangeEventSourceCoordinator:141)
    [2021-01-18 07:37:25,835] INFO Connected metrics set to 'false' (io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics:60)
    [2021-01-18 07:37:25,977] DEBUG checking for more records... (io.debezium.connector.base.ChangeEventQueue:179)
    [2021-01-18 07:37:25,979] INFO WorkerSourceTask{id=debezium-ora-002-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:397)
    [2021-01-18 07:37:25,979] INFO WorkerSourceTask{id=debezium-ora-002-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:414)
    [2021-01-18 07:37:25,980] DEBUG WorkerSourceTask{id=debezium-ora-002-0} Finished offset commitOffsets successfully in 0 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:444)
    [2021-01-18 07:37:25,980] ERROR WorkerSourceTask{id=debezium-ora-002-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
    org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
    at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
    at io.debezium.connector.oracle.xstream.XstreamStreamingChangeEventSource.execute(XstreamStreamingChangeEventSource.java:82)
    at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:140)
    at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:113)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    Caused by: oracle.streams.StreamsException: ORA-21560: argument last_position is null, invalid, or out of range

    at oracle.streams.XStreamOut.XStreamOutAttachNative(Native Method)
    at oracle.streams.XStreamOut.attachInternal(XStreamOut.java:373)
    at oracle.streams.XStreamOut.attach(XStreamOut.java:343)
    at io.debezium.connector.oracle.xstream.XstreamStreamingChangeEventSource.execute(XstreamStreamingChangeEventSource.java:70)
    ... 7 more

    [2021-01-18 07:37:25,982] ERROR WorkerSourceTask{id=debezium-ora-002-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
    [2021-01-18 07:37:25,982] INFO Stopping down connector (io.debezium.connector.common.BaseSourceTask:192)

    1 reply
    Stevan Milic
    @stevanmilic
    Hey people, we are trying to update the configuration change from *.whitelist to *.include.list introduced in version 1.3, but when we try to PUT the config through REST API we are receiving 400, and the Debezium logs show that slot name with the same name already exists. It looks like Debezium tries to re-create the slot, although no schema/table has been added to the *.include.list. Is there a workaround, or the only solution would be to create a different slot?
    3 replies
    Martin Perez
    @mpermar_gitlab

    Hey, another one. I'm trying to figure out why a connector might have stopped producing data. The database is quite heavy on writes. I can see some redo log switching after 15 min or so. I don't see any errors in the log but with debug logging turned on I can see how lag increases and was around ~10 seconds or so when all of a sudden the connector stopped processing data, again with no error reported.

    Could it be the increased lag?

    3 replies
    lbernal
    @lbernal
    Hey people, do you know why the connector for SQL Server (Change Data Capture) doesn't support the net changes mode? I mean, by seeing the code it seems that only the "cdc.fn_cdc_get_allchanges<capture_instance>" functions are called for any particular capture instance, but no call can be found for the "cdc.fn_cdc_get_netchanges<capture_instance>" functions. In summary, I would like to capture the net changes from one particular capture instance, but it isn't apparently supported. I'm not a Java programmer, but at first glance, it looks a pretty easy change in code to support such functionality, so I'm wondering why it could haven't been implemented yet. Would you please advise?
    1 reply
    Fabricio Giordani
    @fgiordani_gitlab
    Hi, I'm connecting an AWS Aurora Postgres CDC to Debezium and when I run "select pg_wal_lsn_diff(sent_lsn, write_lsn) as write_lag from pg_stat_replication;" it's returning "-3762382224". When I tested in postgres database it works, but when I point the Debezium configuration to my database it isn't working (on the same server).
    Mike Cowgill
    @mikecowgill
    Hi all! I'm having an issue that I'm struggling to resolve. Using the postgresql connector I'm getting the exception below for two tables (out of 231 tables). In looking at the resolveColumnsFromStreamTupleData method, it seems that the short numberOfColumns = buffer.getShort(); is longer than the table.columns() which causes this exception and the connector shuts down. I was able get past the problem by adding the two offending tables to the table blacklist. This is all in development and no schema changes are being made to the tables after starting the connector.

    ERROR Producer failure (io.debezium.pipeline.ErrorHandler) [debezium-postgresconnector-encdb-change-event-source-coordinator]
    java.lang.IndexOutOfBoundsException: Index 309 out of bounds for length 309
    at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
    at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
    at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
    at java.base/java.util.Objects.checkIndex(Objects.java:372)
    at java.base/java.util.ArrayList.get(ArrayList.java:459)
    at java.base/java.util.Collections$UnmodifiableList.get(Collections.java:1310)
    at java.base/java.util.Collections$UnmodifiableList.get(Collections.java:1310)
    at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.resolveColumnsFromStreamTupleData(PgOutputMessageDecoder.java:561)
    at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.decodeInsert(PgOutputMessageDecoder.java:379)
    at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.processNotEmptyMessage(PgOutputMessageDecoder.java:191)
    at io.debezium.connector.postgresql.connection.AbstractMessageDecoder.processMessage(AbstractMessageDecoder.java:42)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:480)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:472)
    at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.searchWalPosition(PostgresStreamingChangeEventSource.java:271)
    at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:131)
    at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:140)
    at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:113)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)