Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
    kc93
    @kc93
    hi everyone,i am using debezium logminer for oracle,and my source side table is quite complex. When the table data changes, the debezium connector will have jsqlparser parsing errors, and then the changed data will be lost,It looks like the parsing SQL statement is misplaced
    yuristpsa
    @yuristpsa

    Hello all, I'm using debezium with PostgreSQL 11.9 on x86_64-pc-linux-gnu with pgoutput running on AWS, debezium version 1.1.

    I am investigating a high CPU usage issue that happens only when connectors are active.
    Every time the CPU spikes happen, I see this query in pg_replication.

    Querying pg_stat_activity shows that this query in pg_replication is running infinitely.

    During this period there is no error or connectors restart in logs.

    Does anyone have any tips about this issue ?

    1 reply
    pg_stat_activity.png
    amazon-rds.png
    yuristpsa
    @yuristpsa

    Hi all,

    Is there some way to avoid debezium generate messages when there are no differences between old and new values ?

    gitcliff
    @gitcliff
    Hello everyone
    am using an embedded debezium mysql connector on a project am working on, since the connector captures incremental updates from the MySQL database binlog then streams to like may be FHIR data store in my use case, the embedded engine will always reord offsets of every change event in the DB ...
    Currently in our project debezium flushes offset periodically even in case of failure due to IO/networking when writing to Parque/FHIR store. ..
    How can we be able to mark and process the failed offsets so that we can avoid data leakages???
    1 reply
    Dan Katz
    @dkj37_gitlab
    Hi all, I'm getting a postgres exception when the debezium process switches from snapshotting to replication slot logical streaming, org.postgresql.util.PSQLException: FATAL: terminating connection due to idle-in-transaction timeout. Is there a known way to resolve this? The snapshotting process takes a few hours, and it seems like the setting idle_in_transaction_session_timeout in Postgres is set to 60 minutes, does this setting need to be tuned?
    2 replies
    WYguosc
    @WYguosc
    Hi, I would like to know the performance of Debezium Server. I tested the TPS 5000/s when I connected to mysql with this server. Is there any other configuration that can be optimized?
    How to Improve throughput
    Piyush Kumar
    @crossdsection

    Hi is there a setting for returning the full document in case of mongo update ? Because the mongo-kafka connector does have this setting publish.full.document.only as mentioned here - https://docs.mongodb.com/kafka-connector/current/kafka-source

    But this is not working in case of debezium .

    1 reply
    Oleksii Zakernychnyi
    @alex-zakernychnyi
    Hello, can we somehow configure keepalive setting for SQL Server Connector https://debezium.io/documentation/reference/1.3/connectors/sqlserver.html
    The problem is in that initially we have CDC events in DB in rare basis (few events per day) and DB closes sleeping connection. And after this connector fails and don't restart automatically. To avoid manual restart (to keep connection alive) it would be nice to make Connector call keepalive request to DB. Or may be for this situation exists some workaround?
    9 replies
    Guy Korland
    @gkorland
    What is the release plan for Debezium Server? when is it supposed to be officially released?
    Maithri Vemula
    @MaithriVemula_twitter

    Hi, I've been trying to solve an error am facing. [2020-12-31 21:33:05,699] INFO For table 'public.students' using select statement: 'SELECT * FROM "public"."students"' (io.debezium.relational.RelationalSnapshotChangeEventSource:310)
    [2020-12-31 21:33:05,705] INFO Snapshot - Final stage (io.debezium.pipeline.source.AbstractSnapshotChangeEventSource:79)
    [2020-12-31 21:33:05,705] ERROR Producer failure (io.debezium.pipeline.ErrorHandler:31)
    java.lang.RuntimeException: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: org/apache/kafka/connect/header/ConnectHeaders
    at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76)
    at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:96)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: org/apache/kafka/connect/header/ConnectHeaders
    at io.debezium.pipeline.EventDispatcher$BufferingSnapshotChangeRecordReceiver.changeRecord(EventDispatcher.java:389)
    at io.debezium.pipeline.EventDispatcher$1.changeRecord(EventDispatcher.java:156)
    at io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:85)
    at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:48)
    at io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:146)
    at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEventsForTable(RelationalSnapshotChangeEventSource.java:345)
    at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:281)
    at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:139)
    at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:63)
    ... 6 more
    Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/connect/header/ConnectHeaders

    Can someone help me resolve this.

    1 reply
    Aviv Netel
    @AvivNetel11
    Hi all,
    How much time doing a snapshot takes to debezium ? on my production db I have more then 400 tables? it takes so long? right now it's over 4 hours
    1 reply
    Avi Mualem
    @AviMualem
    hey all,
    How can i connect Debezium to Kafka cluster which requires authentication + SSL... for some reason couldn't find anything on the docs on it..
    1 reply
    syedasad31
    @syedasad31

    Hi, I am trying to capture data from mysql into Kafka and then want to use JdbcSinkConnector to dump data into another mysql, But in I am getting following error

    2021-01-03 19:55:09,763 INFO || Setting metadata for table "customers" to Table{name='"customers"', type=TABLE columns=[Column{'last_name', isPrimaryKey=false, allowsNull=false, sqlType=VARCHAR}, Column{'email', isPrimaryKey=false, allowsNull=false, sqlType=VARCHAR}, Column{'first_name', isPrimaryKey=false, allowsNull=false, sqlType=VARCHAR}, Column{'id', isPrimaryKey=true, allowsNull=false, sqlType=INT}]} [io.confluent.connect.jdbc.util.TableDefinitions]
    2021-01-03 19:55:09,764 INFO || Unable to find fields [SinkRecordField{schema=Schema{INT64}, name='ts_ms', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRUCT}, name='transaction', isPrimaryKey=false}, SinkRecordField{schema=Schema{dbserver1.inventory.customers.Value:STRUCT}, name='before', isPrimaryKey=false}, SinkRecordField{schema=Schema{dbserver1.inventory.customers.Value:STRUCT}, name='after', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRING}, name='op', isPrimaryKey=false}, SinkRecordField{schema=Schema{io.debezium.connector.mysql.Source:STRUCT}, name='source', isPrimaryKey=false}] among column names [last_name, email, first_name, id] [io.confluent.connect.jdbc.sink.DbStructure]
    2021-01-03 19:55:09,765 ERROR || WorkerSinkTask{id=jdbc-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Cannot ALTER TABLE "customers" to add missing field SinkRecordField{schema=Schema{STRING}, name='op', isPrimaryKey=false}, as the field is not optional and does not have a default value [org.apache.kafka.connect.runtime.WorkerSinkTask]
    org.apache.kafka.connect.errors.ConnectException: Cannot ALTER TABLE "customers" to add missing field SinkRecordField{schema=Schema{STRING}, name='op', isPrimaryKey=false}, as the field is not optional and does not have a default value
    Can anybody help me

    9 replies
    syedasad31
    @syedasad31
    @skyrocknroll if you can help me in this regard
    Chanh Le
    @giaosudau
    Hey everyone,
    How to do a reload snapshot for a table?
    2 replies
    Andrew Ferrier
    @andrewferrier_twitter
    Hi folks, I'm trying to download the Debezium connectors for MS Sql Server from here: https://debezium.io/releases/1.3/. However, all the download links are returning 404 Not Found. Does anyone know if this is a known issue and/or if there's somewhere else I can get them from temporarily?
    Jiri Pechanec
    @jpechane
    @andrewferrier_twitter Hi, this is bug in the page, thanks for reporting. Check the URL, it contains doubel Final string
    2 replies
    Janus88
    @Janus88
    Hi everyone, I try to connect my MS SQL DB to Kafka using Kafka Connect + Debezium. It works until I try to do a online schema update as described here: https://debezium.io/documentation/reference/connectors/sqlserver.html#online-schema-updates. When deleting the old CDC instance I get a java.SQLServerException: invalid objectname "cdc.fn_cdc_get_all_changes_dbo_tblKafkaConnectTest". I also tried deleting the new instance with the same result:
    35 replies
    image.png
    nlevitsky
    @nlevitsky
    Hi,
    I created a debezium connector containing the "ExtractField" transformation. My flow is working as expected when I upload all the relevant containers on my local environment, the kafka connect produces the correct messages into Kafka. But when I run my integration test, using "debezium-testing-testcontainers" v1.2.5 (tried also 1.4-CR1 and 1.3.1-FINAL), I recieve the following exception: "IllegalArgumentException: unknown property 'payload'".
    Both the key and the value transformation don't work. This exception kills the connector's task, which causes my test to fails.
    Why is this exception thrown? How come the same connector configuration works on my local containers?
    Please help!
    This is my connector config:
    {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.user": "root",
    "database.password": "test",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "table_name",
    "database.allowPublicKeyRetrieval":"true",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "internal.key.converter.schemas.enable": "false",
    "internal.value.converter.schemas.enable": "false",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "database.whitelist": "database_name",
    "table.whitelist": "database_name.table_name",
    "tombstones.on.delete" : "false",
    "transforms": "unwrap, extractKey, extractValue",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "true",
    "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractKey.field": "message_id",
    "transforms.extractValue.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
    "transforms.extractValue.field": "payload"
    }
    17 replies
    syedasad31
    @syedasad31
    anyone can help me in adding fields to payload when doing delete.handling.mode=rewrite
    5 replies
    Thomas Thornton
    @twthorn

    Hi, I’m working on a project that uses an embedded Debezium connector to achieve exactly once semantics for CDC messages in Kafka. The way we achieve this currently is with the EmbeddedEngine class, which creates a change consumer with the following method signature:

    public void handleBatch(List<SourceRecord> records, DebeziumEngine.RecordCommitter<SourceRecord> committer)

    After processing a batch of records, we commit a message with additional metadata on the downstream state in order to achieve exactly once semantics. Specifically, we block until all messages are published to Kafka, and then commit a message that contains the current Kafka topic to offset map. With this information, we can deduplicate messages upon restart (i.e., read the committed message that has the Kafka topic to offset map, read the offset of the consumer, and skip the messages that were already published).
    However, since the EmbeddedEngine.create method is deprecated, we wanted to move to the DebeziumEngine equivalent. The signature changes to

    public void handleBatch(List<RecordChangeEvent<SourceRecord>> records,
                            DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer)

    The additional RecordChangeEvent encapsulating class prevents us from creating records with additional metadata since the interface only has a get method. Thus, the records are now immutable, and we can’t achieve our previous exactly once semantics.
    We have a few questions

    1. Is there any workaround such that we could modify the RecordChangeEvent objects that are committed in the change consumer?
    2. If not, could we consider changing the RecordChangeEvent interface such that we could create or update the SourceRecord of the event? We could contribute to this change.
    6 replies
    rohilVerkada
    @rohilVerkada
    for some reason when I create my connectors to Postgres RDS instance, topics are only generated for tables once new events come in. I believe this is due to the LSN being read earlier. Is there anyway to start with a clean connector so that it creates events for each entry that is currently in the DB? In the kafka connect logs, I see that it has already processed some LSNs... I would like to reset this so I can get a state of the db
    1 reply
    hitman86
    @hitman86

    Hi - I have been using debezium to do initial snapshots for mysql tables. Recently, when I used it for a large table with 10 million rows, we noticed the mysql checkpoint age kept increasing and as a result there was a noticeable drop in disk writes on the source database server. Whats the best way to do snapshots without impacting the source database and mitigate risks associated with absence of checkpoints?

    hi - does anyone have tips regarding this? any help will be much appreciated - thanks!

    Vivek Dehariya
    @Vivek-Dh
    Hi, Is there any out-of-the-box config available to treat MySQL "Date" datatype as is? Currently the change data json gives me an integer which is the number of days since Jan 1 1970 I believe. I need the exact date in my sink.
    3 replies
    abhinavcohesity
    @abhinavcohesity
    Hi, I am using debezium postgres with AWS Aurora instance. I am using wal2json_rds_streaming plugin. What I am seeing is that my replication slot becomes inactive after some time although the connector continues to stream events. Anybody has faced a similar issue or has any idea why this maybe occuring ?
    @jpechane
    3 replies
    bk
    @bbk1985

    @ating28 Hi, it seems that the connector have problems to obtain lock to make a constsent snapshot. COuld you please try snapshot.isolation.mode=snapshot if you have snapshots enabled for your database?

    I dont have snapshot isolation enabled on the database, is turning it on the only solution?

    17 replies
    Thomas Daghelet
    @tdaghelet

    Hello, we have sometimes this kind of error in our kconnect debezium connector :

    [Worker clientId=connect-1, groupId=debezium-postgres-connect-cluster] Member connect-1-a9ec1a0c-3138-46d3-9054-c31825bf8325 sending LeaveGroup request to coordinator kafka.development.local:9092 (id: 2147483646 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]

    I would like to know how to detect this behaviour to restart the worker? Because our liveness probe is searching for a FAILED in status, but both worker and task are RUNNING

    {"name":"account-history-connector","connector":{"state":"RUNNING","worker_id":"10.130.38.235:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"10.130.38.235:8083"}],"type":"source"}
    4 replies
    Martin Perez
    @mpermar_gitlab

    A global order for events emitted to the schema change topic is vital. Therefore, you must not partition the database history topic. This means that you must specify a partition count of 1 when creating the database history topic. When relying on auto topic creation, make sure that Kafka’s num.partitions configuration option, which specifies the default number of partitions, is set to 1.

    Does this fragment from the MySQL connector documentation apply to other connectors too?

    2 replies
    OT: How do people get rid of the _gitlab, _twitter... from their nickname?
    mparikhcloudbeds
    @mparikhcloudbeds

    Has anyone had a use case where they would like to move their MySQL data using Debezium to destination Cassandra Keyspace? I'm using Datastax OSS Cassandra connector and one of the properties are looking to set config as "topic.my_topic.key_space.my_table.mapping"...

    Now with Debezium MySQL Connector the Kafka topic that gets auto-generated has naming pattern such as "logical-server-name.schema_name.table_name".. It seems like the Cassandra connector won't like it...

    Any pointers to solve this issue will be really appreciated

    1 reply
    tonysun0123
    @tonysun0123

    Hi Jiri and other Debezium developers, we need your help again!
    When we ran our application integrated with embedded Debezium, we saw below warning logs:
    2021-Jan-05 16:51:42.513 WARN [debezium-sqlserverconnector-dbo-change-event-source-coordinator] i.d.c.s.SqlServerStreamingChangeEventSource - No table has enabled CDC or security constraints prevents getting the list of change tables 2021-Jan-05 16:51:42.513 WARN [debezium-sqlserverconnector-dbo-change-event-source-coordinator] i.d.c.s.SqlServerStreamingChangeEventSource - No whitelisted table has enabled CDC, whitelisted table list does not contain any table with CDC enabled or no table match the white/blacklist filter(s)
    Further investigation found that the warning logs were generated by the function getCdcTablesToQuery() of class io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource. The function getCdcTablesToQuery() ran query "EXEC sys.sp_cdc_help_change_data_capture" against the database to get the list of cdcEnabledTables, but somehow it got an empty list and then it generated those warning logs. However when I ran the same query on our database directly, I did get the result showing the table is CDC enabled.

    Did we do anything wrong in the cofiguration or somewhere? Thanks!

    3 replies
    WYguosc
    @WYguosc
    image.png
    14 replies
    hi,i use the debezium server with oracle,Who can tell me why it's locked
    1 reply
    Per Kristian Fjellby
    @perkrifj_gitlab
    Hi Debezium community! We're trailing a setup where we're moving data from 1 MySQL instance to another. On the source-side we're using the MySQL connector and the sinks are JDBC sink connector. We seem to be hitting a lot of weird cases when it comes to conversion of timestamps and datetimes. How are you people handling this? Is everyone writing their own custom formatter or is there a standardized way of handling this? As an example, in one update 4 of the time related columns come through in the ISO 8601 format. For the same table, but a different UPDATE, 1 column has the ISO 8601 format, while the 3 remaining does not. How are you folks handling this? Any feedback is appreciated! Thanks.
    20 replies
    Alok Kumar Singh
    @alok87
    I am working on setting up the auth between the debezium and Kafka brokers. Please point me to the documentation for the available options and examples.
    8 replies
    RadioGuy
    @RadioGuy
    Hi Team,
    What is the minimum CPU requirement for the Debezium-postgres with no load?
    I create a connector a postgres connector on container starup and CPU gois up to 1034 MB on container startup.
    It fails if I keep the container CPU at 512 Mhz.
    3 replies
    caoquyvu
    @caoquyvu

    Hi everyone and @jpechane . I have a question about this article
    https://debezium.io/blog/2020/04/09/using-debezium-with-apicurio-api-schema-registry/

    How can i parse this schema

    {
    "type": "struct",
    "fields": [
    {
    "type": "struct",
    "fields": [
    {
    "type": "int32",
    "optional": false,
    "field": "id"
    },
    {
    "type": "string",
    "optional": false,
    "field": "first_name"
    },
    {
    "type": "string",
    "optional": false,
    "field": "last_name"
    },
    {
    "type": "string",
    "optional": false,
    "field": "email"
    }
    ],
    "optional": true,
    "name": "dbserver1.inventory.customers.Value",
    "field": "before"
    },
    ...
    ],
    "optional": false,
    "name": "dbserver1.inventory.customers.Envelope"
    }

    to this avro schema

    {
    "type": "record",
    "name": "Envelope",
    "namespace": "dbserver1.inventory.customers",
    "fields": [
    {
    "name": "before",
    "type": [
    "null",
    {
    "type": "record",
    "name": "Value",
    "fields": [
    {
    "name": "id",
    "type": "int"
    },
    {
    "name": "first_name",
    "type": "string"
    },
    {
    "name": "last_name",
    "type": "string"
    },
    {
    "name": "email",
    "type": "string"
    }
    ],
    "connect.name": "dbserver1.inventory.customers.Value"
    }
    ],
    "default": null
    },
    {
    "name": "after",
    "type": [
    "null",
    "Value"
    ],
    "default": null
    },
    ...
    ],
    "connect.name": "dbserver1.inventory.customers.Envelope"
    }

    Any libs i can use to parse this debezium schema to avro schema. Many tks.

    2 replies
    dctmswetha
    @dctmswetha

    Hi there, i am getting this error while trying to read logs of oracle logminer. any idea how to resolve ?

    ERROR Mining session stopped due to the {} (io.debezium.connector.oracle.logminer.LogMinerHelper)
    java.sql.SQLException: ORA-01291: missing log file

    1 reply
    Chris Cranford
    @Naros
    Hi @dctmswetha this could be releated to https://issues.redhat.com/browse/DBZ-2855.
    I have a proposed PR in the work but I need to build a test case but I'm trying to understand how Oracle gets in this state so I can reproduce it.
    dctmswetha
    @dctmswetha

    @Naros thank you for getting back to me. I am trying to connect to Oracle db and read the logminer redo logs. I have followed the steps present in this link
    https://debezium.io/documentation/reference/1.3/connectors/oracle.html#oracle-connector-properties

    what i understood is deezium is able to authenticate to the db but not able to capture the initial snapshot of the table i need

    HuPengCheng
    @HuPengCheng
    Hi I am trying to do setup debezium connector for oracle in test machine and also testing for just one table in CDB, PDB oracle 12c setup . Connector failed with following error Caused by: java.lang.IllegalArgumentException: No metadata registered for captured table ORCLCDB.C##XSTRM.TEST. There is no topic of my table in Kafka.
    Only the topic of my connector
    "database.history.kafka.topic"
    HuPengCheng
    @HuPengCheng
    image.png
    2 replies
    rahulopenxcell
    @rahulopenxcell
    Hey is it recommendable to use lambda sync connector on confluent with very high data frequency of data update ?
    8 replies
    @jpechane