Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
    wangyg007
    @wangyg007
    i have the same quetion with you,when i add table in oralce-connector: table.include.list. but when i change the db_history topic,and restart, it missing
    Anuraag Singh
    @anuraag.zolo_gitlab

    Hi All,

    We are facing problem while inserting mongo db records to bigQ via debezium. If there is change in sequence of fields in a json documents e.g. {"fields1": 1, "fields":{"field2": 2, "field3": "test"}} AND {"fields1": 1, "fields":{"field3": "test", "field2": 2}}
    As you can see nested object is having different sequence

    When above scenarios is happening, records are going to BigQ merge tables but are giving error while merging and inserting into final table because of change in sequence of fields.

    Please help me if you have faced this issue before.

    wongster80
    @wongster80
    Hi I'm trying to hit the "pause" API endpoint to pause the connector but I'm getting a 405
    curl -s XPUT "http://<remote_host>:8083/connectors/<connector_name>/pause"
    {"error_code":405,"message":"HTTP 405 Method Not Allowed"}
    sonyasania
    @sonyasaniaaa_twitter
    Hi all, i have an issue with postgresql connector, we have specified multiple table in table.exclude.list, and that parameter not works. when i run a topic the table still appear.. can someone pease help?
    1 reply
    Nicolas Garcia
    @nicoga97

    Hi team I had an outage with debezium that took long time to fix, and i had a problem as described in the documentation:
    Debezium needs a PostgreSQL’s WAL to be kept during Debezium outages. If your WAL retention is too small and outages too long then Debezium will not be able to recover after restart as it will miss part of the data changes. The usual indicator is an error similar to this thrown during the startup: ERROR: requested WAL segment 000000010000000000000001 has already been removed. When this happens then it is necessary to re-execute the snapshot of the database. We also recommend to set parameter wal_keep_segments = 0. Please follow PostgreSQL official documentation for fine-tuning of WAL retention.

    My question is how can I re-execute the snapshot of the database ?I have tried several options changed the "snapshot.mode" but i am always receiving the same error "SQLException: ERROR: requested WAL segment 000000010000082D0000000D has already been removed " can anyone help me please?

    1 reply
    Artsiom Yudovin
    @ayudovin
    Hi, Could someone help? I would detect that debezium finishes snapshot loading. What options do I have ?
    ahvahsky2008
    @ahvahsky2008
    Hi guys, i recieve JdbcConnectionException: ERROR: permission denied
    what i doing wrong?
    vaibhav pandey
    @vaibhavpandey2000
    how we can generate ts_usec in place of ts_ms in source object anyone has any idea?
    lucasjames666
    @lucasjames666:minds.com
    [m]
    Hello everyone i am having $600 is the anyone willing to sell his or her bitcoin at cheaper rates
    Ronaldo Lanhellas
    @rlanhellas

    Hello guys, I'm using Debezium Oracle v1.9, my connector is running normal but with the following status:

      "connector": {
            "state": "RUNNING",
            "worker_id": "null:-1"
        },

    worker_id: null , is normal ?

    hurahurat
    @hurahurat
    Hi everyone. How to overcome this error:
    com.github.shyiko.mysql.binlog.network.ServerException: Client requested master to start replication from impossible position; the first event 'mysql-bin.000001' at 43109, the last event read from 'mysql-bin.000001' at 4
    Nhat Nguyen
    @ntnhaatj
    hi everyone , I am trying to deploy Avro Schema Registry (using Apicurio) for serializing , but encountered this problem, could anyone help me please? Thanks a lot
    connect      | 2022-05-16 03:33:55,790 ERROR  ||  Stopping due to error   [org.apache.kafka.connect.cli.ConnectDistributed]
    connect      | org.apache.kafka.common.config.ConfigException: Invalid value io.apicurio.registry.utils.converter.AvroConverter for configuration key.converter: Class io.apicurio.registry.utils.converter.AvroConverter could not be found.
    connect      |     at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:728)
    connect      |     at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474)
    connect      |     at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467)
    connect      |     at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108)
    connect      |     at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129)
    connect      |     at org.apache.kafka.connect.runtime.WorkerConfig.<init>(WorkerConfig.java:385)
    connect      |     at org.apache.kafka.connect.runtime.distributed.DistributedConfig.<init>(DistributedConfig.java:379)
    connect      |     at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:93)
    connect      |     at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78)
    1 reply
    Robert B. Hanviriyapunt
    @RobertHana

    hi everyone, i'm getting the following error with a MySQL connector

    Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.

    has anyone seen this before? can this be the cause of my MySQL connector suddenly not working?

    vaibhav pandey
    @vaibhavpandey2000
    is there any property to change key type? like text (Struct{id=1881693}) to json ({id=1881693})
    k-tomasz
    @k-tomasz

    Hi all, I'm using Debezium Oracle source connector with Avro. The goal is to fetch data and schema and move it to other Oracle DB with exactly the same schema.
    Unfortunately I have special character in column name and Avro doesn't like it.
    I tried the

        "transforms": "RenameField",
        "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.RenameField.renames": "COL_VAL#:COL_VAL_",

    but it doesn't work, still failing with org.apache.avro.SchemaParseException: Illegal character in: COL_VAL#
    Could you please advice?
    Below the excerpt from the schema (I can attach whole if needed)

    {
      "connect.name": "my_topic.Envelope",
      "fields": [
        {
          "default": null,
          "name": "before",
          "type": [
            "null",
            {
              "connect.name": "my_topic.Value",
              "fields": [
                {
                  "name": "COL_VAL_1",
                  "type": "string"
                },
                {
                  "name": "COL_VAL_CNT,
                  "type": "string"
                },
                {
                  "name": "COL_VAL#",
                  "type": "string"
                }
              ],
              "name": "Value",
              "type": "record"
            }
          ]
        },
        {
          "default": null,
          "name": "after",
          "type": [
            "null",
            "Value"
          ]
        },
        {
          "name": "source",
          "type": {
            "connect.name": "io.debezium.connector.oracle.Source",
            "fields": [
    
             ...cut...
    
            ],
            "name": "Source",
            "namespace": "io.debezium.connector.oracle",
            "type": "record"
          }
        },
        {
          "name": "op",
          "type": "string"
        },
        {
          "default": null,
          "name": "ts_ms",
          "type": [
            "null",
            "long"
          ]
        },
        {
          "default": null,
          "name": "transaction",
          "type": [
            "null",
            {
              "fields": [
                {
                  "name": "id",
                  "type": "string"
                },
                {
                  "name": "total_order",
                  "type": "long"
                },
                {
                  "name": "data_collection_order",
                  "type": "long"
                }
              ],
              "name": "ConnectDefault",
              "namespace": "io.confluent.connect.avro",
              "type": "record"
            }
          ]
        }
      ],
      "name": "Envelope",
      "namespace": "my_topic",
      "type": "record"
    }
    ahvahsky2008
    @ahvahsky2008
    Hi guys, how detect when schema changed in source table?
    leonardara
    @leonardara
    hi experts, i found that when i use debezium pubsub server, the topic will NOT be created automatically, do i miss some configurations or this is by design?
    ahvahsky2008
    @ahvahsky2008
    Hi everyone, column adding works to table (cdc works), but when try delete column its says:
    Unable to find fields [SinkRecordField{schema=Schema{STRING}, name='test', isPrimaryKey=false}] among column names [id, sum_cost, transaction_ts] [io.confluent.connect.jdbc.sink.DbStructure]
    {
        "name": "jdbc-sink",
        "config": {
            "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
            "topics.regex": "sink_db.public.(.*)",
            "connection.url": "jdbc:postgresql://db_slave:5432/sink_db?user=postgresuser&password=postgrespw",
            "transforms": "unwrap",
            "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
            "transforms.unwrap.drop.tombstones": "false",
            "auto.create": "true",
            "auto.evolve": "true",
            "insert.mode": "upsert",
            "delete.enabled": "true",
            "pk.fields.regex": "(.*)id",
            "pk.mode": "record_key"
        }
    }
    ahvahsky2008
    @ahvahsky2008
    {
        "name": "pg-get-data-connector",
        "config":{
            "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
            "database.hostname": "db_master",
            "database.port": 5432,
            "database.user": "postgresuser",
            "database.password": "postgrespw",
    
            "database.dbname" : "db_master",
            "database.server.name": "sink_db",
            "plugin.name": "pgoutput",
    
            "table.include.list": "public.(.*)",
    
            "poll.interval.ms": "1000",
            "snapshot.mode":"always",
            "schema.whitelist": "public"
    
        }
    }
    handaru
    @handaru
    Hi everyone. I just implemented Debezium/Server Standalone. I wanna to get logs and send them to Datadog. My first problem is...how to get the logs for Debexium/Server Standalone? If we deployed Debezium on top Kafka Connect we can get the logs from JMX Metrics, right? Anyone have experience to deal with Debezium/Server Standalone 's logs? Thanks a lot.
    Paulo
    @paulossjunior
    Hello, I would like to use debezium with apache beam and postgressql. Someone knows about some tutorial about it
    ?
    sujithramanathan
    @sujithramanathan
    Hi Team,
    We are using DB2 debezium and we have a clob column. While streaming the records from DB to Kafka topic we are getting com.ibm.db2.jcc.am.c_@2d336dda. Can anyone help me to resolve this.
    sujithramanathan
    @sujithramanathan

    Hi Team,
    We are using DB2 debezium and we have a clob column. While streaming the records from DB to Kafka topic we are getting com.ibm.db2.jcc.am.c_@2d336dda. Can anyone help me to resolve this.

    Can anyone please help me on this. Was stucked here

    SchliffMachine
    @SchliffMachine_twitter

    Good afternoon Team!

    At the moment we are using Debezium MySql against Aurora.
    GTID mode in MySql is set to OFF_PERMISSIVE, so considered enabled as of this check in MySqlConnection#isGtidModeEnabled:
    return !"OFF".equalsIgnoreCase(rs.getString(2)); - so it considers anything not OFF to be ON.

    Upon restarting the process we eventually end up with the error:
    The replication sender thread cannot start in AUTO_POSITION mode: this server has GTID_MODE = OFF_PERMISSIVE instead of ON

    In other words - Debezium considers OFF_PERMISSIVE as enabled, but in order to progress it eventually (transitively, as the error comes from some sql client library) needs this to be ON.

    So basically (at least from what I can tell) we can't move forward from this point without changing the configuration on Aurora.
    If I'm wrong on this - please correct me, would be happy!

    I believe this situation could be omitted if the usage of GTID at all was configurable on the Debezium side (as it's an optimization after all, not something required for Debezium to work).

    Have anyone encountered this issue before?

    ms-oyo
    @ms-oyo

    Hi, Can anyone share how does debezium maintain Postgres connection?

    The documention shares the CDC logic, whereas I want to know how the connection b/w connector and Postgres host are established.

    More in lines of is it a pool connection - does the connector use a heartbeat signal to determine if the RDS is up?

    Aravindan C
    @aravindan.ck_gitlab
    Hi all, I'm using Debezium server to capture changes from PostgreSQL.
    Debezium uses PostgreSQL's logical replication slot to capture changes which already remembers the LSN until which the connector has replicated.
    Is it mandatory to have a FileOffsetBackingStore to record the connector offsets? Is there a way to skip it and just rely on PostgreSQL's data?
    1 reply
    Ghost
    @ghost~628efad26da037398497505c
    Hi, Can I listen and get mysql views data through debezium?
    1 reply
    Luan Chanh Tran
    @luantran1995
    Hello all, could you help me clearly with this part https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-connector-is-stopped-for-a-duration?
    In case I stopped the debezium and insert some data in Postgres SQL after that I restart it but at that time I did not see debezium catch up data change sent to Kafka . Is there any missing config in this code below:
    @Bean
    public io.debezium.config.Configuration postgresConnector() throws IOException {
    File offsetStorageTempFile = File.createTempFile("offsets", ".dat");
    File dbHistoryTempFile = File.createTempFile("dbhistory
    ", ".dat");
    return io.debezium.config.Configuration.create()
    .with("name", "pg-connector")
    .with("connector.class","io.debezium.connector.postgresql.PostgresConnector")
    .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
    .with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
    .with("offset.flush.interval.ms", "0")
    .with("database.hostname", postgresDbHost)
    .with("database.port", postgresDbPort)
    .with("database.user", postgresDbUsername)
    .with("database.password", postgresDbPassword)
    .with("database.dbname", postgresDbName)
    .with("database.include.list", postgresDbName)
    .with("database.server.name", "PostgreSQL")
    .with("plugin.name", "pgoutput")
    .with("table.whitelist", "public.t_se_interface,public.g_individu")
    .with("snapshot.mode","initial")
    .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
    .with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath())
    .build();
    }
    sribarrow
    @sribarrow
    {
    "name": "emp-connector",
    "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname" : "emp",
    "database.server.name": "localhost",
    "database.whitelist": "emp",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.emp"
    }'
    {"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nError while validating connector config: The connection attempt failed.\nYou can also find the above list of errors at the endpoint /connector-plugins/{connectorType}/config/validate"}
    I am trying to follow this: https://hevodata.com/learn/connecting-kafka-to-postgresql/#m1 and i get issue when i try to start the connector
    I had a few issues with the postgres where i had to setup the env variable. I have even reset the postgres password. just not sure what can be changed. should i use ip instead of localhost?
    sivaaspire
    @sivaaspire:matrix.org
    [m]
    azureuser@app:~/strimzi-kafka-operator/templates$ kubectl get kctr inventory-connector -o yaml
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
    annotations:
    kubectl.kubernetes.io/last-applied-configuration: |
    {"apiVersion":"kafka.strimzi.io/v1beta2","kind":"KafkaConnector","metadata":{"annotations":{},"labels":{"strimzi.io/cluster":"my-connect-cluster-debezium"},"name":"inventory-connector","namespace":"default"},"spec":{"class":"io.debezium.connector.mysql.MySqlConnector","config":{"database.allowPublicKeyRetrieval":"true","database.history.kafka.bootstrap.servers":"fqcevent.servicebus.windows.net:9093","database.history.kafka.topic":"schema-changes.inventory","database.hostname":"#####","database.password":"######","database.port":"3306","database.server.id":"1","database.server.name":"app","database.user":"root","database.whitelist":"inventory","include.schema.changes":"true"},"tasksMax":1}}
    creationTimestamp: "2022-06-15T08:18:16Z"
    generation: 1
    labels:
    strimzi.io/cluster: my-connect-cluster-debezium
    name: inventory-connector
    namespace: default
    resourceVersion: "627127"
    uid: f416dbd4-6781-48f4-8b4d-6f4ae6c0d439
    spec:
    class: io.debezium.connector.mysql.MySqlConnector
    config:
    database.allowPublicKeyRetrieval: "true"
    database.history.kafka.bootstrap.servers: fqcevent.servicebus.windows.net:9093
    database.history.kafka.topic: schema-changes.inventory
    database.hostname: #########
    database.password: #########
    database.port: "3306"
    database.server.id: "1"
    database.server.name: app
    database.user: root
    database.whitelist: inventory
    include.schema.changes: "true"
    tasksMax: 1
    status:
    conditions:
    - lastTransitionTime: "2022-06-15T08:18:18.031523Z"
    message: 'GET /connectors/inventory-connector/topics returned 404 (Not Found):
    Unexpected status code'
    reason: ConnectRestException
    status: "True"
    type: NotReady
    observedGeneration: 1
    tasksMax: 1
    topics: []
    Shantnu Jain
    @shantnuj_gitlab

    I have the following setup
    Oracle -> Kafka -> PostgreSQL
    Source Connector config is

    {
        "name":"myfirst-connector",
        "config":{
            "connector.class":"io.debezium.connector.oracle.OracleConnector",
            "tasks.max":"1",
            "database.hostname":"192.168.29.102",
            "database.port":"1521",
            "database.user":"c##dbzuser",
            "database.password":"dbz",
            "database.dbname":"ORCLCDB",
            "database.pdb.name":"ORCLPDB1",
            "database.server.name":"oracle19",
            "database.connection.adapter":"logminer",
            "database.history.kafka.topic":"schema_changes",
            "database.history.kafka.bootstrap.servers":"192.168.29.102:9092",
            "database.tablename.case.insensitive":"true",
            "snapshot.mode":"initial",
            "tombstones.on.delete":"true",
            "include.schema.changes": "true",
            "sanitize.field.names":"true",
            "key.converter":"org.apache.kafka.connect.json.JsonConverter",
            "key.converter.schemas.enable":"true",
            "value.converter":"org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable":"true",
            "time.precision.mode": "connect",
            "database.oracle.version":19
        } }

    Sink connector config is

    {
        "name": "myjdbc-sink-testdebezium",
        "config": {
            "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
            "tasks.max": "1",
            "topics.regex": "oracle19.C__DBZUSER.*",
            "connection.url": "jdbc:postgresql://192.168.29.102:5432/postgres?user=puser&password=My19cPassword",
            "key.converter":"org.apache.kafka.connect.json.JsonConverter",
            "key.converter.schemas.enable":"true",
            "value.converter":"org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable":"true",
            "dialect.name": "PostgreSqlDatabaseDialect",
            "auto.create": "true",
            "auto.evolve": "true",
            "insert.mode": "upsert",
            "delete.enabled": "true",
            "transforms": "unwrap, RemoveString, TimestampConverter",
            "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
            "transforms.unwrap.delete.handling.mode": "none",
            "transforms.RemoveString.type": "org.apache.kafka.connect.transforms.RegexRouter",
            "transforms.RemoveString.regex": "(.*)\\.C__DBZUSER\\.(.*)",
            "transforms.RemoveString.replacement": "$2",
            "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
            "transforms.TimestampConverter.target.type": "Timestamp",
            "transforms.TimestampConverter.field": "dob",
            "pk.mode": "record_key"
        }
    }

    Now when I drop a table in Oracle I get an entry in schema_changes topic but the table is not dropped from PostgreSQL. Need help in figuring out the issue why drop is not getting propogated. Just FYI, all the other operations i.e. Create Table, Alter Table, Insert, Update, Delete are working fine. Only DROP is not working and I am not getting any exception either.

    David Daniel Arch
    @daviddanielarch_twitter
    hi all, I've setup MSK Connect with Debezium but forgot to attach to the connector the configuration that adds the key.converter and value.converter configs set to JsonConverter. All the topics have already being populated with my Postgres data. Does anybody know if changing the converter nows means I need to sync Postgres with Kafka again?
    Daan Bosch
    @spinside007_gitlab
    Is there a way to use authentication when connecting to Postgres? Setting trust in pg_hba.conf is not secure according to this blog. https://medium.com/@lmramos.usa/debezium-cdc-postgres-c9ce4da05ce1
    Is there a way to make sure scram authentication is used? or md5?
    koevet
    @koevet:matrix.org
    [m]

    hello, I'm trying to setup Debezium with Oracle and I'm following this guide: https://github.com/debezium/debezium-examples/tree/main/tutorial#using-oracle

    I have the various components running (Oracle, Kafka, etc) and when I try to register the Debezium Oracle connector I get this error:

    {"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nUnable to connect: Failed to resolve Oracle database version\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}
    I can't find anything wrong in the configuration and I can connect to the dockerized Oracle on localhost:1521 without problems
    for reference this is the config that I'm pushing to the connector:
    {
      "name": "inventory-connector",
      "config": {
        "connector.class": "io.debezium.connector.oracle.OracleConnector",
        "tasks.max": "1",
        "database.server.name": "server1",
        "database.hostname": "localhost",
        "database.port": "1521",
        "database.user": "c##dbzuser",
        "database.password": "dbz",
        "database.dbname": "ORCLCDB",
        "database.pdb.name": "ORCLPDB1",
        "database.connection.adapter": "logminer",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory"
      }
    }
    1 reply
    finally, this is the stack trace from the connector: https://gist.github.com/luciano-fiandesio/0a78e6b2d3aeb9e1b56269eb2ad4715d
    I have setup debezium before on Postgres and I didn't have a single problem, any idea wher ethe problem may be?
    koevet
    @koevet:matrix.org
    [m]
    :point_up: Edit: I have setup debezium before on Postgres and I didn't have a single problem, any idea where the problem may be?

    :point_up: Edit: hello, I'm trying to setup Debezium with Oracle and I'm following this guide: https://github.com/debezium/debezium-examples/tree/main/tutorial#using-oracle

    I have the various components running (Oracle, Kafka, etc) and when I try to register the Debezium Oracle connector I get this error:

    {"error_code":400,"message":\
    "Connector configuration is invalid and contains the following 1 error(s):\nUnable to connect: Failed to resolve Oracle database version\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}

    :point_up: Edit: hello, I'm trying to setup Debezium with Oracle and I'm following this guide: https://github.com/debezium/debezium-examples/tree/main/tutorial#using-oracle

    I have the various components running (Oracle, Kafka, etc) and when I try to register the Debezium Oracle connector I get this error:

    {"error_code":400,"message":
    "Connector configuration is invalid and contains the following 1 error(s):\nUnable to connect: Failed to resolve Oracle database version\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}
    leexuGit
    @leexuGit
    Does CDC support Oracle 10G?
    Butter Ngo
    @butterngo
    Hi team now i'm getting a problem with Message Filltering "Validation error: transforms.filter.type - Invalid value io.
    debezium.transforms.Filter for configuration transforms.filt
    er.type: Class io.debezium.transforms.Filter could not be fo
    und." i added plugins follow documentation but any missing configuration https://debezium.io/documentation/reference/1.3/configuration/filtering.html