Please join the Debezium community on Zulip (https://debezium.zulipchat.com). This room is not used any longer.
Hi All,
We are facing problem while inserting mongo db records to bigQ via debezium. If there is change in sequence of fields in a json documents e.g. {"fields1": 1, "fields":{"field2": 2, "field3": "test"}} AND {"fields1": 1, "fields":{"field3": "test", "field2": 2}}
As you can see nested object is having different sequence
When above scenarios is happening, records are going to BigQ merge tables but are giving error while merging and inserting into final table because of change in sequence of fields.
Please help me if you have faced this issue before.
Hi team I had an outage with debezium that took long time to fix, and i had a problem as described in the documentation:Debezium needs a PostgreSQL’s WAL to be kept during Debezium outages. If your WAL retention is too small and outages too long then Debezium will not be able to recover after restart as it will miss part of the data changes. The usual indicator is an error similar to this thrown during the startup: ERROR: requested WAL segment 000000010000000000000001 has already been removed.
When this happens then it is necessary to re-execute the snapshot of the database. We also recommend to set parameter wal_keep_segments = 0. Please follow PostgreSQL official documentation for fine-tuning of WAL retention.
My question is how can I re-execute the snapshot of the database ?I have tried several options changed the "snapshot.mode" but i am always receiving the same error "SQLException: ERROR: requested WAL segment 000000010000082D0000000D has already been removed " can anyone help me please?
connect | 2022-05-16 03:33:55,790 ERROR || Stopping due to error [org.apache.kafka.connect.cli.ConnectDistributed]
connect | org.apache.kafka.common.config.ConfigException: Invalid value io.apicurio.registry.utils.converter.AvroConverter for configuration key.converter: Class io.apicurio.registry.utils.converter.AvroConverter could not be found.
connect | at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:728)
connect | at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474)
connect | at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467)
connect | at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108)
connect | at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129)
connect | at org.apache.kafka.connect.runtime.WorkerConfig.<init>(WorkerConfig.java:385)
connect | at org.apache.kafka.connect.runtime.distributed.DistributedConfig.<init>(DistributedConfig.java:379)
connect | at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:93)
connect | at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78)
hi everyone, i'm getting the following error with a MySQL connector
Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
has anyone seen this before? can this be the cause of my MySQL connector suddenly not working?
Hi all, I'm using Debezium Oracle source connector with Avro. The goal is to fetch data and schema and move it to other Oracle DB with exactly the same schema.
Unfortunately I have special character in column name and Avro doesn't like it.
I tried the
"transforms": "RenameField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "COL_VAL#:COL_VAL_",
but it doesn't work, still failing with org.apache.avro.SchemaParseException: Illegal character in: COL_VAL#
Could you please advice?
Below the excerpt from the schema (I can attach whole if needed)
{
"connect.name": "my_topic.Envelope",
"fields": [
{
"default": null,
"name": "before",
"type": [
"null",
{
"connect.name": "my_topic.Value",
"fields": [
{
"name": "COL_VAL_1",
"type": "string"
},
{
"name": "COL_VAL_CNT,
"type": "string"
},
{
"name": "COL_VAL#",
"type": "string"
}
],
"name": "Value",
"type": "record"
}
]
},
{
"default": null,
"name": "after",
"type": [
"null",
"Value"
]
},
{
"name": "source",
"type": {
"connect.name": "io.debezium.connector.oracle.Source",
"fields": [
...cut...
],
"name": "Source",
"namespace": "io.debezium.connector.oracle",
"type": "record"
}
},
{
"name": "op",
"type": "string"
},
{
"default": null,
"name": "ts_ms",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "transaction",
"type": [
"null",
{
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "total_order",
"type": "long"
},
{
"name": "data_collection_order",
"type": "long"
}
],
"name": "ConnectDefault",
"namespace": "io.confluent.connect.avro",
"type": "record"
}
]
}
],
"name": "Envelope",
"namespace": "my_topic",
"type": "record"
}
Unable to find fields [SinkRecordField{schema=Schema{STRING}, name='test', isPrimaryKey=false}] among column names [id, sum_cost, transaction_ts] [io.confluent.connect.jdbc.sink.DbStructure]
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics.regex": "sink_db.public.(.*)",
"connection.url": "jdbc:postgresql://db_slave:5432/sink_db?user=postgresuser&password=postgrespw",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.fields.regex": "(.*)id",
"pk.mode": "record_key"
}
}
{
"name": "pg-get-data-connector",
"config":{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "db_master",
"database.port": 5432,
"database.user": "postgresuser",
"database.password": "postgrespw",
"database.dbname" : "db_master",
"database.server.name": "sink_db",
"plugin.name": "pgoutput",
"table.include.list": "public.(.*)",
"poll.interval.ms": "1000",
"snapshot.mode":"always",
"schema.whitelist": "public"
}
}
Hi Team,
We are using DB2 debezium and we have a clob column. While streaming the records from DB to Kafka topic we are getting com.ibm.db2.jcc.am.c_@2d336dda. Can anyone help me to resolve this.
Can anyone please help me on this. Was stucked here
Good afternoon Team!
At the moment we are using Debezium MySql against Aurora.
GTID mode in MySql is set to OFF_PERMISSIVE, so considered enabled as of this check in MySqlConnection#isGtidModeEnabled
:return !"OFF".equalsIgnoreCase(rs.getString(2));
- so it considers anything not OFF to be ON.
Upon restarting the process we eventually end up with the error:The replication sender thread cannot start in AUTO_POSITION mode: this server has GTID_MODE = OFF_PERMISSIVE instead of ON
In other words - Debezium considers OFF_PERMISSIVE as enabled, but in order to progress it eventually (transitively, as the error comes from some sql client library) needs this to be ON.
So basically (at least from what I can tell) we can't move forward from this point without changing the configuration on Aurora.
If I'm wrong on this - please correct me, would be happy!
I believe this situation could be omitted if the usage of GTID at all was configurable on the Debezium side (as it's an optimization after all, not something required for Debezium to work).
Have anyone encountered this issue before?
Hi, Can anyone share how does debezium maintain Postgres connection?
The documention shares the CDC logic, whereas I want to know how the connection b/w connector and Postgres host are established.
More in lines of is it a pool connection - does the connector use a heartbeat signal to determine if the RDS is up?
/connector-plugins/{connectorType}/config/validate
"}
I have the following setup
Oracle -> Kafka -> PostgreSQL
Source Connector config is
{
"name":"myfirst-connector",
"config":{
"connector.class":"io.debezium.connector.oracle.OracleConnector",
"tasks.max":"1",
"database.hostname":"192.168.29.102",
"database.port":"1521",
"database.user":"c##dbzuser",
"database.password":"dbz",
"database.dbname":"ORCLCDB",
"database.pdb.name":"ORCLPDB1",
"database.server.name":"oracle19",
"database.connection.adapter":"logminer",
"database.history.kafka.topic":"schema_changes",
"database.history.kafka.bootstrap.servers":"192.168.29.102:9092",
"database.tablename.case.insensitive":"true",
"snapshot.mode":"initial",
"tombstones.on.delete":"true",
"include.schema.changes": "true",
"sanitize.field.names":"true",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"true",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"true",
"time.precision.mode": "connect",
"database.oracle.version":19
} }
Sink connector config is
{
"name": "myjdbc-sink-testdebezium",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics.regex": "oracle19.C__DBZUSER.*",
"connection.url": "jdbc:postgresql://192.168.29.102:5432/postgres?user=puser&password=My19cPassword",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"true",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"true",
"dialect.name": "PostgreSqlDatabaseDialect",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"transforms": "unwrap, RemoveString, TimestampConverter",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.delete.handling.mode": "none",
"transforms.RemoveString.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.RemoveString.regex": "(.*)\\.C__DBZUSER\\.(.*)",
"transforms.RemoveString.replacement": "$2",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.target.type": "Timestamp",
"transforms.TimestampConverter.field": "dob",
"pk.mode": "record_key"
}
}
Now when I drop a table in Oracle I get an entry in schema_changes topic but the table is not dropped from PostgreSQL. Need help in figuring out the issue why drop is not getting propogated. Just FYI, all the other operations i.e. Create Table, Alter Table, Insert, Update, Delete are working fine. Only DROP is not working and I am not getting any exception either.
hello, I'm trying to setup Debezium with Oracle and I'm following this guide: https://github.com/debezium/debezium-examples/tree/main/tutorial#using-oracle
I have the various components running (Oracle, Kafka, etc) and when I try to register the Debezium Oracle connector I get this error:
{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nUnable to connect: Failed to resolve Oracle database version\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"tasks.max": "1",
"database.server.name": "server1",
"database.hostname": "localhost",
"database.port": "1521",
"database.user": "c##dbzuser",
"database.password": "dbz",
"database.dbname": "ORCLCDB",
"database.pdb.name": "ORCLPDB1",
"database.connection.adapter": "logminer",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}
:point_up: Edit: hello, I'm trying to setup Debezium with Oracle and I'm following this guide: https://github.com/debezium/debezium-examples/tree/main/tutorial#using-oracle
I have the various components running (Oracle, Kafka, etc) and when I try to register the Debezium Oracle connector I get this error:
{"error_code":400,"message":\
"Connector configuration is invalid and contains the following 1 error(s):\nUnable to connect: Failed to resolve Oracle database version\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}
:point_up: Edit: hello, I'm trying to setup Debezium with Oracle and I'm following this guide: https://github.com/debezium/debezium-examples/tree/main/tutorial#using-oracle
I have the various components running (Oracle, Kafka, etc) and when I try to register the Debezium Oracle connector I get this error:
{"error_code":400,"message":
"Connector configuration is invalid and contains the following 1 error(s):\nUnable to connect: Failed to resolve Oracle database version\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}