Please join the Debezium community on Zulip (https://debezium.zulipchat.com). This room is not used any longer.
connect | 2022-05-16 03:33:55,790 ERROR || Stopping due to error [org.apache.kafka.connect.cli.ConnectDistributed]
connect | org.apache.kafka.common.config.ConfigException: Invalid value io.apicurio.registry.utils.converter.AvroConverter for configuration key.converter: Class io.apicurio.registry.utils.converter.AvroConverter could not be found.
connect | at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:728)
connect | at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474)
connect | at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467)
connect | at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108)
connect | at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129)
connect | at org.apache.kafka.connect.runtime.WorkerConfig.<init>(WorkerConfig.java:385)
connect | at org.apache.kafka.connect.runtime.distributed.DistributedConfig.<init>(DistributedConfig.java:379)
connect | at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:93)
connect | at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78)
hi everyone, i'm getting the following error with a MySQL connector
Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
has anyone seen this before? can this be the cause of my MySQL connector suddenly not working?
Hi all, I'm using Debezium Oracle source connector with Avro. The goal is to fetch data and schema and move it to other Oracle DB with exactly the same schema.
Unfortunately I have special character in column name and Avro doesn't like it.
I tried the
"transforms": "RenameField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "COL_VAL#:COL_VAL_",
but it doesn't work, still failing with org.apache.avro.SchemaParseException: Illegal character in: COL_VAL#
Could you please advice?
Below the excerpt from the schema (I can attach whole if needed)
{
"connect.name": "my_topic.Envelope",
"fields": [
{
"default": null,
"name": "before",
"type": [
"null",
{
"connect.name": "my_topic.Value",
"fields": [
{
"name": "COL_VAL_1",
"type": "string"
},
{
"name": "COL_VAL_CNT,
"type": "string"
},
{
"name": "COL_VAL#",
"type": "string"
}
],
"name": "Value",
"type": "record"
}
]
},
{
"default": null,
"name": "after",
"type": [
"null",
"Value"
]
},
{
"name": "source",
"type": {
"connect.name": "io.debezium.connector.oracle.Source",
"fields": [
...cut...
],
"name": "Source",
"namespace": "io.debezium.connector.oracle",
"type": "record"
}
},
{
"name": "op",
"type": "string"
},
{
"default": null,
"name": "ts_ms",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "transaction",
"type": [
"null",
{
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "total_order",
"type": "long"
},
{
"name": "data_collection_order",
"type": "long"
}
],
"name": "ConnectDefault",
"namespace": "io.confluent.connect.avro",
"type": "record"
}
]
}
],
"name": "Envelope",
"namespace": "my_topic",
"type": "record"
}
Unable to find fields [SinkRecordField{schema=Schema{STRING}, name='test', isPrimaryKey=false}] among column names [id, sum_cost, transaction_ts] [io.confluent.connect.jdbc.sink.DbStructure]
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics.regex": "sink_db.public.(.*)",
"connection.url": "jdbc:postgresql://db_slave:5432/sink_db?user=postgresuser&password=postgrespw",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.fields.regex": "(.*)id",
"pk.mode": "record_key"
}
}
{
"name": "pg-get-data-connector",
"config":{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "db_master",
"database.port": 5432,
"database.user": "postgresuser",
"database.password": "postgrespw",
"database.dbname" : "db_master",
"database.server.name": "sink_db",
"plugin.name": "pgoutput",
"table.include.list": "public.(.*)",
"poll.interval.ms": "1000",
"snapshot.mode":"always",
"schema.whitelist": "public"
}
}
Hi Team,
We are using DB2 debezium and we have a clob column. While streaming the records from DB to Kafka topic we are getting com.ibm.db2.jcc.am.c_@2d336dda. Can anyone help me to resolve this.
Can anyone please help me on this. Was stucked here
Good afternoon Team!
At the moment we are using Debezium MySql against Aurora.
GTID mode in MySql is set to OFF_PERMISSIVE, so considered enabled as of this check in MySqlConnection#isGtidModeEnabled
:return !"OFF".equalsIgnoreCase(rs.getString(2));
- so it considers anything not OFF to be ON.
Upon restarting the process we eventually end up with the error:The replication sender thread cannot start in AUTO_POSITION mode: this server has GTID_MODE = OFF_PERMISSIVE instead of ON
In other words - Debezium considers OFF_PERMISSIVE as enabled, but in order to progress it eventually (transitively, as the error comes from some sql client library) needs this to be ON.
So basically (at least from what I can tell) we can't move forward from this point without changing the configuration on Aurora.
If I'm wrong on this - please correct me, would be happy!
I believe this situation could be omitted if the usage of GTID at all was configurable on the Debezium side (as it's an optimization after all, not something required for Debezium to work).
Have anyone encountered this issue before?
Hi, Can anyone share how does debezium maintain Postgres connection?
The documention shares the CDC logic, whereas I want to know how the connection b/w connector and Postgres host are established.
More in lines of is it a pool connection - does the connector use a heartbeat signal to determine if the RDS is up?
/connector-plugins/{connectorType}/config/validate
"}
I have the following setup
Oracle -> Kafka -> PostgreSQL
Source Connector config is
{
"name":"myfirst-connector",
"config":{
"connector.class":"io.debezium.connector.oracle.OracleConnector",
"tasks.max":"1",
"database.hostname":"192.168.29.102",
"database.port":"1521",
"database.user":"c##dbzuser",
"database.password":"dbz",
"database.dbname":"ORCLCDB",
"database.pdb.name":"ORCLPDB1",
"database.server.name":"oracle19",
"database.connection.adapter":"logminer",
"database.history.kafka.topic":"schema_changes",
"database.history.kafka.bootstrap.servers":"192.168.29.102:9092",
"database.tablename.case.insensitive":"true",
"snapshot.mode":"initial",
"tombstones.on.delete":"true",
"include.schema.changes": "true",
"sanitize.field.names":"true",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"true",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"true",
"time.precision.mode": "connect",
"database.oracle.version":19
} }
Sink connector config is
{
"name": "myjdbc-sink-testdebezium",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics.regex": "oracle19.C__DBZUSER.*",
"connection.url": "jdbc:postgresql://192.168.29.102:5432/postgres?user=puser&password=My19cPassword",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"true",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"true",
"dialect.name": "PostgreSqlDatabaseDialect",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"transforms": "unwrap, RemoveString, TimestampConverter",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.delete.handling.mode": "none",
"transforms.RemoveString.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.RemoveString.regex": "(.*)\\.C__DBZUSER\\.(.*)",
"transforms.RemoveString.replacement": "$2",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.target.type": "Timestamp",
"transforms.TimestampConverter.field": "dob",
"pk.mode": "record_key"
}
}
Now when I drop a table in Oracle I get an entry in schema_changes topic but the table is not dropped from PostgreSQL. Need help in figuring out the issue why drop is not getting propogated. Just FYI, all the other operations i.e. Create Table, Alter Table, Insert, Update, Delete are working fine. Only DROP is not working and I am not getting any exception either.
hello, I'm trying to setup Debezium with Oracle and I'm following this guide: https://github.com/debezium/debezium-examples/tree/main/tutorial#using-oracle
I have the various components running (Oracle, Kafka, etc) and when I try to register the Debezium Oracle connector I get this error:
{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nUnable to connect: Failed to resolve Oracle database version\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"tasks.max": "1",
"database.server.name": "server1",
"database.hostname": "localhost",
"database.port": "1521",
"database.user": "c##dbzuser",
"database.password": "dbz",
"database.dbname": "ORCLCDB",
"database.pdb.name": "ORCLPDB1",
"database.connection.adapter": "logminer",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}
:point_up: Edit: hello, I'm trying to setup Debezium with Oracle and I'm following this guide: https://github.com/debezium/debezium-examples/tree/main/tutorial#using-oracle
I have the various components running (Oracle, Kafka, etc) and when I try to register the Debezium Oracle connector I get this error:
{"error_code":400,"message":\
"Connector configuration is invalid and contains the following 1 error(s):\nUnable to connect: Failed to resolve Oracle database version\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}
:point_up: Edit: hello, I'm trying to setup Debezium with Oracle and I'm following this guide: https://github.com/debezium/debezium-examples/tree/main/tutorial#using-oracle
I have the various components running (Oracle, Kafka, etc) and when I try to register the Debezium Oracle connector I get this error:
{"error_code":400,"message":
"Connector configuration is invalid and contains the following 1 error(s):\nUnable to connect: Failed to resolve Oracle database version\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}
I have a connector that finishes the snapshot phase but then fails every time, looking for issue for a couple of hours, can't find it, any ideas:
connect | [2022-07-10 13:19:44,310] INFO Snapshot ended with SnapshotResult [status=COMPLETED, offset=MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.000003, currentBinlogPosition=197, currentRowNumber=0, serverId=0, sourceTime=2022-07-10T11:19:44.308Z, threadId=-1, currentQuery=null, tableIds=[userTransactions.EUR], databaseName=userTransactions], snapshotCompleted=true, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=1d8b8ba9-0042-11ed-b7a0-0242c0a89006:1-114, currentGtidSet=1d8b8ba9-0042-11ed-b7a0-0242c0a89006:1-114, restartBinlogFilename=mysql-bin.000003, restartBinlogPosition=197, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]] (io.debezium.pipeline.ChangeEventSourceCoordinator)
connect | [2022-07-10 13:19:44,310] INFO Requested thread factory for connector MySqlConnector, id = userTransactions named = binlog-client (io.debezium.util.Threads)
connect | [2022-07-10 13:19:44,317] INFO Requested thread factory for connector MySqlConnector, id = userTransactions named = kafka-signal (io.debezium.util.Threads)
connect | [2022-07-10 13:19:44,317] ERROR Producer failure (io.debezium.pipeline.ErrorHandler)
connect | java.lang.NullPointerException
connect | at java.base/java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1011)
connect | at java.base/java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006)
connect | at java.base/java.util.Properties.put(Properties.java:1340)
connect | at java.base/java.util.Properties.setProperty(Properties.java:228)
connect | at io.debezium.config.Configuration$Builder.withDefault(Configuration.java:687)
connect | at io.debezium.connector.mysql.signal.KafkaSignalThread.<init>(KafkaSignalThread.java:101)
connect | at io.debezium.connector.mysql.MySqlReadOnlyIncrementalSnapshotChangeEventSource.<init>(MySqlReadOnlyIncrementalSnapshotChangeEventSource.java:91)
connect | at io.debezium.connector.mysql.MySqlChangeEventSourceFactory.getIncrementalSnapshotChangeEventSource(MySqlChangeEventSourceFactory.java:91)
...
and then:
connect | [2022-07-10 13:19:45,074] ERROR WorkerSourceTask{id=Wallets-Debezium-Mysql-UserTransactions-V2-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
connect | org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
connect | at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50)
connect | at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:116)
connect | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect | at java.base/java.lang.Thread.run(Thread.java:829)
connect | Caused by: java.lang.NullPointerException
connect | at java.base/java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1011)
connect | at java.base/java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006)
connect | at java.base/java.util.Properties.put(Properties.java:1340)
connect | at java.base/java.util.Properties.setProperty(Proper
I'm trying to use MongoDB outbox pattern with the payload as byte[]. I'm using the quarkus-avro library which converts my userEvent.avsc file into a UserEvent.java class that has an inbuild encoder which returns the byte[] for the class. I use a mapper to convert my original User.java instance into the UserEvent.java instance , and then i convert this into a byte[] and store it as a payload in the outbox table for Mongodb.
When I use debezium with io.debezium.converters.ByteArrayConverter value converter I get an alphanumeric string in kafka. When my consumer tries to consume this as a byte[], it throws an error like unidentified hex value.
I have other consumers that read from the kafka topic and have configs that accept avro schema and byte[] and create the java instance out of it. However, the byte sequence seems to differ after being consumed by mongodb as when I print the encoded byte[] vs the one that I read from kafka, they are different. Any idea what to do here? The idea is that the avro byte[] should be passed as-is to kafka, which can be read by consumers in a decode function.