Please join the Debezium community on Zulip (https://debezium.zulipchat.com). This room is not used any longer.
Hi, please help me, I get this error while using Docker image debezium/server:latest
to CDC my Oracle database:
{
"exception": {
"refId": 1,
"exceptionType": "java.lang.ClassNotFoundException",
"message": "io.debezium.connector.oracle.OracleConnector"
}
}
My application.properties
:
debezium.sink.type=redis
debezium.sink.redis.address=redis:6379
debezium.sink.redis.batch.size=10
debezium.source.connector.class=io.debezium.connector.oracle.OracleConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.server.name=server1
debezium.source.database.hostname=10.11.12.12
debezium.source.database.port=1521
debezium.source.database.user=dbuser
debezium.source.database.password=dbP@ssw0rd
debezium.source.database.dbname=BON
debezium.source.database.out.server.name=dbzxout
debezium.source.database.connection.adapter=logminer
debezium.source.database.tablename.case.insensitive=true
debezium.source.table.include.list=lookup.branch_test_limited,lookup.branch_test,pick.pick_test
debezium.source.database.tablename.case.insensitive=true
debezium.source.database.oracle.version=12+
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
quarkus.log.console.json=true
I solved by mount jars from Oracle Connector tutorial to /debezium/lib
inside Docker container. But now I get error:
"exception": {
"refId": 1,
"exceptionType": "java.lang.NoSuchFieldError",
"message": "INTERNAL_CONNECTOR_CLASS",
"frames": [
{
"class": "io.debezium.storage.kafka.history.KafkaDatabaseHistory",
"method": "<clinit>",
"line": 169
},
{
"class": "io.debezium.storage.kafka.history.KafkaStorageConfiguration",
"method": "validateServerNameIsDifferentFromHistoryTopicName",
"line": 17
},
{
"class": "io.debezium.config.Field$Validator",
"method": "lambda$and$0",
"line": 232
},
{
"class": "io.debezium.config.Field",
"method": "validate",
"line": 640
},
{
"class": "io.debezium.config.Configuration",
"method": "validate",
"line": 1863
},
{
"class": "io.debezium.config.Configuration",
"method": "validateAndRecord",
"line": 1879
},
{
"class": "io.debezium.connector.common.BaseSourceTask",
"method": "start",
"line": 119
},
{
"class": "io.debezium.embedded.EmbeddedEngine",
"method": "run",
"line": 759
},
{
"class": "io.debezium.embedded.ConvertingEngineBuilder$2",
"method": "run",
"line": 192
},
{
"class": "io.debezium.server.DebeziumServer",
"method": "lambda$start$1",
"line": 150
},
{
"class": "java.util.concurrent.ThreadPoolExecutor",
"method": "runWorker",
"line": 1128
},
{
"class": "java.util.concurrent.ThreadPoolExecutor$Worker",
"method": "run",
"line": 628
},
{ "class": "java.lang.Thread", "method": "run", "line": 829 }
]
}
Even though I set debezium.source.database.history=io.debezium.server.redis.RedisDatabaseHistory
.
Message from logs:
"Connector completed: success = 'false', message = 'Unable to initialize and start connector's task class 'io.debezium.connector.oracle.OracleConnectorTask' with config: {connector.class=io.debezium.connector.oracle.OracleConnector, debezium.sink.redis.batch.size=10, database.history.redis.address=redis:6379, database.tablename.case.insensitive=true, database.history.redis.ssl.enabled=false, offset.storage.file.filename=data/offsets.dat, database.out.server.name=dbzxout, database.oracle.version=11, value.converter=org.apache.kafka.connect.json.JsonConverter, key.converter=org.apache.kafka.connect.json.JsonConverter, database.user=XXXXUSER1, database.dbname=BON, offset.storage=io.debezium.server.redis.RedisOffsetBackingStore, debezium.sink.type=redis, debezium.sink.redis.address=redis:6379, database.connection.adapter=logminer, database.server.name=server1, offset.flush.timeout.ms=5000, database.port=1521, offset.flush.interval.ms=0, internal.key.converter=org.apache.kafka.connect.json.JsonConverter, database.hostname=10.11.12.12, database.password=********, name=redis, internal.value.converter=org.apache.kafka.connect.json.JsonConverter, table.include.list=lookup.branch_test_limited,lookup.branch_test,pick.pick_test, database.history=io.debezium.server.redis.RedisDatabaseHistory}', error = '{}'".
There is a line database.history=io.debezium.server.redis.RedisDatabaseHistory
meaning my config is loaded.
Hi everyone. We extensively use Debezium to capture change Data in Mysql and push to kafka topic. Recenetly we have been facing some issues in Debezium . Can anyone explain me why I got this exceptation last night :
org.apache.kafka.connect.errors.ConnectException: Client requested master to start replication from position > file size; the first event ‘mysql-bin-changelog.446422’ at 28051871, the last event read from ‘/rdsdbdata/log/binlog/mysql-bin-changelog.446422’ at 4, the last byte read from ‘/rdsdbdata/log/binlog/mysql-bin-changelog.446422’ at 4. Error code: 1236; SQLSTATE: HY000. at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230) at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:197) at io.debezium.connector.mysql.BinlogReader$ReaderThreadLifecycleListener.onCommunicationFailure(BinlogReader.java:1018) at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:950) at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:580) at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:825) at java.lang.Thread.run(Thread.java:748) Caused by: com.github.shyiko.mysql.binlog.network.ServerException: Client requested master to start replication from position > file size; the first event ‘mysql-bin-changelog.446422’ at 28051871, the last event read from ‘/rdsdbdata/log/binlog/mysql-bin-changelog.446422’ at 4, the last byte read from ‘/rdsdbdata/log/binlog/mysql-bin-changelog.446422’ at 4. at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:914) ... 3 more
Hi does anyone know how to fix this error? Can I tell Debezium to advance the position of the binlog to force to Debezium to keep running even if the source MySQL DB crashed and is missing data?
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at io.debezium.connector.base.ChangeEventQueue.doEnqueue(ChangeEventQueue.java:204)
- locked <0x0000000080a0fa00> (a io.debezium.connector.base.ChangeEventQueue)
at io.debezium.connector.base.ChangeEventQueue.enqueue(ChangeEventQueue.java:169)
at io.debezium.pipeline.EventDispatcher$StreamingChangeRecordReceiver.changeRecord(EventDispatcher.java:408)
I'm using Debezium Server 1.9.5 with MySql 5.7 and getting the following output in my logs. The number of messages seems restricted to 2048 per period, even though there have been many more than this.
Could you advise why this is so, and how to rectify? This didn't happen with previous versions.
2022-10-03 09:52:27,571 INFO [io.deb.con.com.BaseSourceTask] (pool-7-thread-1) 2048 records sent during previous 00:00:11.176, last recorded offset: {transaction_id=null, ts_sec=1664785944, file=mysql-bin.006100, pos=57005053, gtids=674946ca-7b05-11e9-8e2b-42010a164904:1-89223667,888c30b8-7b04-11e9-9823-42010a164902:1-23726302,c178b930-79a7-11ea-b858-42010a16490c:1-262677191, row=3, server_id=2608013947, event=314257}
2022-10-03 09:53:41,624 INFO [io.deb.con.com.BaseSourceTask] (pool-7-thread-1) 2048 records sent during previous 00:01:14.053, last recorded offset: {transaction_id=null, ts_sec=1664785944, file=mysql-bin.006100, pos=57005053, gtids=674946ca-7b05-11e9-8e2b-42010a164904:1-89223667,888c30b8-7b04-11e9-9823-42010a164902:1-23726302,c178b930-79a7-11ea-b858-42010a16490c:1-262677191, row=2, server_id=2608013947, event=318692}
2022-10-03 09:56:25,997 INFO [io.deb.con.com.BaseSourceTask] (pool-7-thread-1) 2048 records sent during previous 00:02:44.373, last recorded offset: {transaction_id=null, ts_sec=1664785944, file=mysql-bin.006100, pos=57005053, gtids=674946ca-7b05-11e9-8e2b-42010a164904:1-89223667,888c30b8-7b04-11e9-9823-42010a164902:1-23726302,c178b930-79a7-11ea-b858-42010a16490c:1-262677191, row=2, server_id=2608013947, event=330885}
2022-10-03 10:01:42,514 INFO [io.deb.con.com.BaseSourceTask] (pool-7-thread-1) 2048 records sent during previous 00:05:16.517, last recorded offset: {transaction_id=null, ts_sec=1664786570, file=mysql-bin.006101, pos=27090386, gtids=674946ca-7b05-11e9-8e2b-42010a164904:1-89223667,888c30b8-7b04-11e9-9823-42010a164902:1-23726302,c178b930-79a7-11ea-b858-42010a16490c:1-262687895, row=1, server_id=2608013947, event=15070}
2022-10-03 10:12:27,812 INFO [io.deb.con.com.BaseSourceTask] (pool-7-thread-1) 2048 records sent during previous 00:10:45.298, last recorded offset: {transaction_id=null, ts_sec=1664786570, file=mysql-bin.006101, pos=27090386, gtids=674946ca-7b05-11e9-8e2b-42010a164904:1-89223667,888c30b8-7b04-11e9-9823-42010a164902:1-23726302,c178b930-79a7-11ea-b858-42010a16490c:1-262687895, row=4, server_id=2608013947, event=66576}
2022-10-03 10:33:42,496 INFO [io.deb.con.com.BaseSourceTask] (pool-7-thread-1) 2048 records sent during previous 00:21:14.684, last recorded offset: {transaction_id=null, ts_sec=1664786570, file=mysql-bin.006101, pos=27090386, gtids=674946ca-7b05-11e9-8e2b-42010a164904:1-89223667,888c30b8-7b04-11e9-9823-42010a164902:1-23726302,c178b930-79a7-11ea-b858-42010a16490c:1-262687895, row=3, server_id=2608013947, event=164121}
Hey I am using debezium 2.0.0-final version
I am using same version debezium-embedded and connector-sql. My connector config is as follows:
io.debezium.config.Configuration.create()
.with("name", "inventory-mysql-connector")
.with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
.with(EmbeddedEngine.OFFSET_STORAGE, "org.apache.kafka.connect.storage.MemoryOffsetBackingStore")
//.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
.with("offset.flush.interval.ms", 60000)
.with("database.hostname", dbHost)
.with("database.port", dbPort)
.with("database.user", dbUsername)
.with("database.password", dbPassword)
.with("database.dbname", dbName)
.with("database.include.list", dbName)
.with("include.schema.changes", "false")
.with("database.allowPublicKeyRetrieval", "true")
.with("database.server.id", 1)
.with("database.server.name", "inventory-mysql-db-server")
.with("schema.history", MemorySchemaHistory.class.getName())
.with("schema.history.file.filename", dbHistoryTempFile.getAbsolutePath())
.with("table.whitelist", "inventory")
.with(MySqlConnectorConfig.TOPIC_PREFIX,"mssql")
.build();
With this configuration i am getting the following error:
[ERROR] 2022-10-20 09:08:17.757 [pool-3-thread-1] KafkaSchemaHistory - The 'schema.history.internal.kafka.topic' value is invalid: A value is required
[ERROR] 2022-10-20 09:08:17.757 [pool-3-thread-1] KafkaSchemaHistory - The 'schema.history.internal.kafka.bootstrap.servers' value is invalid: A value is required
[INFO ] 2022-10-20 09:08:17.758 [pool-3-thread-1] BaseSourceTask - Stopping down connector
[INFO ] 2022-10-20 09:08:17.759 [pool-4-thread-1] JdbcConnection - Connection gracefully closed
[ERROR] 2022-10-20 09:08:17.760 [pool-3-thread-1] EmbeddedEngine - Unable to initialize and start connector's task class 'io.debezium.connector.mysql.MySqlConnectorTask' with config: {connector.class=io.debezium.connector.mysql.MySqlConnector, include.schema.changes=false, table.whitelist=inventory, topic.prefix=mssql, offset.storage.file.filename=/var/folders/vn/klcnhyb53tddp64f69fm1rtm0000gn/T/offsets_11260356475744786045.dat, errors.retry.delay.initial.ms=300, value.converter=org.apache.kafka.connect.json.JsonConverter, schema.history=io.debezium.relational.history.MemorySchemaHistory, key.converter=org.apache.kafka.connect.json.JsonConverter, database.allowPublicKeyRetrieval=true, database.dbname=beepkart_php_production, database.user=root, offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore, database.server.id=1, database.server.name=inventory-mysql-db-server, offset.flush.timeout.ms=5000, errors.retry.delay.max.ms=10000, database.port=3306, offset.flush.interval.ms=60000, errors.max.retries=-1, database.hostname=localhost, database.password=**, name=inventory-mysql-connector, schema.history.file.filename=/var/folders/vn/klcnhyb53tddp64f69fm1rtm0000gn/T/dbhistory_5219220079200975230.dat, database.include.list=*}
org.apache.kafka.connect.errors.ConnectException: Error configuring an instance of KafkaSchemaHistory; check the logs for details
at io.debezium.storage.kafka.history.KafkaSchemaHistory.configure(KafkaSchemaHistory.java:209) ~[debezium-storage-kafka-2.0.0.Final.jar:2.0.0.Final]
at io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig.getSchemaHistory(HistorizedRelationalDatabaseConnectorConfig.java:115) ~[debezium-core-2.0.0.Final.jar:2.0.0.Final]
I have tried multiple ways but no luck. Can anyone tell what is the issue and how to resolve it?
Hello my fellow debezium heroes. I come to you with a problem that is currently bothering my team mate and me. It is a combination of Avro and Debezium and I am stuck:
We have a topic being setup with an auto uploaded avro-schema by the debezium connector. Now, after a while it appears that we need to change the datatype of on of the table fields from
Timestamp to TimestampTZ ( we use postgres ).
Do you guys have any process / hint on how this change can be introduced in a compatiable way? I currently try to use the KafkaConnect "rename field" (https://docs.confluent.io/platform/current/connect/transforms/replacefield.html#rename-a-field) however this does not seem to work with debezium, right?
thanks in advance for the help :-)
Hello Team,
i try to implement the debezium for kafka using debezium server. I following this method
https://debezium.io/documentation/reference/stable/operations/debezium-server.html
Using this link I downloaded the binaries for the debezium server
The main configuration file is conf/application.properties.
+++
debezium.sink.type=kafka
debezium.sink.kafka.producer.bootstrap.servers=:9092
debezium.sink.kafka.producer.key.serializer=org.apache.kafka.common.serialization.Serializer
debezium.sink.kafka.producer.value.serializer=org.apache.kafka.common.serialization.Serializer
debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector
debezium.source.tasks.max=1
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=**
debezium.source.database.port=1433
debezium.source.database.user=
debezium.source.database.password=
debezium.source.database.dbname=
debezium.source.database.server.name=
debezium.source.schema.include.list=inventory
quarkus.log.console.json=false
debezium.source.database.encrypt=false
debezium.source.database.history.producer.security.protocol=PLAINTEXT
debezium.source.database.history.kafka.bootstrap.servers=**:9092
debezium.source.database.history.kafka.topic=dbhistory.fulfillment
debezium.source.table.include.list=dbo.Users
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.source.database.history.file.filename=data/FileDatabaseHistory.dat
+++
using this config file when i run the command ./run.sh i am getting this error.
+++
2022-11-03 12:43:01,818 INFO [org.apa.kaf.cli.pro.KafkaProducer] (main) [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 0 ms.
2022-11-03 12:43:01,818 INFO [org.apa.kaf.com.met.Metrics] (main) Metrics scheduler closed
2022-11-03 12:43:01,818 INFO [org.apa.kaf.com.met.Metrics] (main) Closing reporter org.apache.kafka.common.metrics.JmxReporter
2022-11-03 12:43:01,819 INFO [org.apa.kaf.com.met.Metrics] (main) Metrics reporters closed
2022-11-03 12:43:01,820 INFO [org.apa.kaf.com.uti.AppInfoParser] (main) App info kafka.producer for producer-1 unregistered
2022-11-03 12:43:01,858 ERROR [io.qua.run.Application] (main) Failed to start application (with profile prod): java.lang.NoSuchMethodException: org.apache.kafka.common.serialization.Serializer.<init>()
at java.base/java.lang.Class.getConstructor0(Class.java:3349)
at java.base/java.lang.Class.getDeclaredConstructor(Class.java:2553)
at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:353)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:395)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:430)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:415)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:366)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:274)
at io.debezium.server.kafka.KafkaChangeConsumer.start(KafkaChangeConsumer.java:60)
at io.debezium.server.kafka.KafkaChangeConsumer_Bean.create(KafkaChangeConsumer_Bean.zig:617)
at io.debezium.server.kafka.KafkaChangeConsumer_Bean.create(KafkaChangeConsumer_Bean.zig:633)
at io.debezium.server.DebeziumServer.start(DebeziumServer.java:115)
at io.debezium.server.DebeziumServer_Bean.create(DebeziumServer_Bean.zig:256)
at io.debezium.server.DebeziumServer_Bean.create(DebeziumServer_Bean.zig:272)
at io.quarkus.arc.impl.AbstractSharedContext.createInstanceHandle(AbstractSharedContext.java:96)
at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:29)
at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:26)
at io.quarkus.ar
+++
@SurendraKumarSheshma
i try to implement the debezium and connect mssql for kafka
i am getting this error
+++
2022-11-04 09:16:29,536 INFO [org.apa.kaf.cli.pro.KafkaProducer] (main) [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 0 ms.
2022-11-04 09:16:29,537 INFO [org.apa.kaf.com.met.Metrics] (main) Metrics scheduler closed
2022-11-04 09:16:29,537 INFO [org.apa.kaf.com.met.Metrics] (main) Closing reporter org.apache.kafka.common.metrics.JmxReporter
2022-11-04 09:16:29,537 INFO [org.apa.kaf.com.met.Metrics] (main) Metrics reporters closed
2022-11-04 09:16:29,540 INFO [org.apa.kaf.com.uti.AppInfoParser] (main) App info kafka.producer for producer-1 unregistered
2022-11-04 09:16:29,656 ERROR [io.qua.run.Application] (main) Failed to start application (with profile prod): java.lang.NoSuchMethodException: org.apache.kafka.common.serialization.Serializer.<init>()
at java.base/java.lang.Class.getConstructor0(Class.java:3349)
at java.base/java.lang.Class.getDeclaredConstructor(Class.java:2553)
at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:392)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:401)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:436)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:421)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:386)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:274)
at io.debezium.server.kafka.KafkaChangeConsumer.start(KafkaChangeConsumer.java:60)
at io.debezium.server.kafka.KafkaChangeConsumer_Bean.create(Unknown Source)
at io.debezium.server.kafka.KafkaChangeConsumer_Bean.create(Unknown Source)
at io.debezium.server.DebeziumServer.start(DebeziumServer.java:122)
at io.debezium.server.DebeziumServer_Bean.create(Unknown Source)
at io.debezium.server.DebeziumServer_Bean.create(Unknown Source)
at io.quarkus.arc.impl.AbstractSharedContext.createInstanceHandle(AbstractSharedContext.java:111)
at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:35)
at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:32)
at io.quarkus.arc.impl.LazyValue.get(LazyValue.java:26)
at io.quarkus.arc.impl.ComputingCache.computeIfAbsent(ComputingCache.java:69)
at io.quarkus.arc.impl.AbstractSharedContext.get(AbstractSharedContext.java:32)
at io.quarkus.arc.impl.ClientProxies.getApplicationScopedDelegate(ClientProxies.java:19)
at io.debezium.server.DebeziumServer_ClientProxy.arc$delegate(Unknown Source)
at io.debezium.server.DebeziumServer_ClientProxy.arc_contextualInstance(Unknown Source)
at io.debezium.server.DebeziumServer_Observer_Synthetic_d70cd75bf32ab6598217b9a64a8473d65e248c05.notify(Unknown Source)
at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:323)
at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:305)
at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:73)
at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:130)
at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:99)
at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy_0(Unknown Source)
at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy(Unknown Source)
at io.quarkus.runner.ApplicationImpl.doStart(Unknown Source)
at io.quarkus.runtime.Application.start(Application.java:101)
at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:103)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:67)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:41)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:120)
at io.debezium.server.Main.main(Main.java:15)
+++
If you have any idea pls help me to resolve this
information_schema.tables
& information_schema.routines
tables from mysql db with Debezium Source Connector (1.9.5).table.ignore.builtin=true
but the connector seems to keep ignoring the tables from the metadata schema:INFO snapshot continuing with database(s): [information_schema] (io.debezium.connector.mysql.MySqlSnapshotChangeEventSource:153)
INFO Snapshot step 3 - Locking captured tables [] (io.debezium.relational.RelationalSnapshotChangeEventSource:104)
INFO Snapshot step 4 - Determining snapshot offset (io.debezium.relational.RelationalSnapshotChangeEventSource:110)
INFO Read binlog position of MySQL primary server (io.debezium.connector.mysql.MySqlSnapshotChangeEventSource:274)
Hello! I'm using a debezium connector to a postgres aurora rds. Most of the time the replication slot lag is just a few bytes or ks, but then it start to grow and can go up to 30GB in less than half hour, that then takes hours to consume. Both ts_ms of source and debezium event confirms that connector is able to read the messages hours after.
The debezium connector does have a heartbeat. We also have tried to increase queue and batch size without luck(back to default), as well as extracting the tables with spikes of ingestion in a different connector.
Any suggestion on what else to check?