Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
    sivaaspire
    @sivaaspire:matrix.org
    [m]
    azureuser@app:~/strimzi-kafka-operator/templates$ kubectl get kctr inventory-connector -o yaml
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
    annotations:
    kubectl.kubernetes.io/last-applied-configuration: |
    {"apiVersion":"kafka.strimzi.io/v1beta2","kind":"KafkaConnector","metadata":{"annotations":{},"labels":{"strimzi.io/cluster":"my-connect-cluster-debezium"},"name":"inventory-connector","namespace":"default"},"spec":{"class":"io.debezium.connector.mysql.MySqlConnector","config":{"database.allowPublicKeyRetrieval":"true","database.history.kafka.bootstrap.servers":"fqcevent.servicebus.windows.net:9093","database.history.kafka.topic":"schema-changes.inventory","database.hostname":"#####","database.password":"######","database.port":"3306","database.server.id":"1","database.server.name":"app","database.user":"root","database.whitelist":"inventory","include.schema.changes":"true"},"tasksMax":1}}
    creationTimestamp: "2022-06-15T08:18:16Z"
    generation: 1
    labels:
    strimzi.io/cluster: my-connect-cluster-debezium
    name: inventory-connector
    namespace: default
    resourceVersion: "627127"
    uid: f416dbd4-6781-48f4-8b4d-6f4ae6c0d439
    spec:
    class: io.debezium.connector.mysql.MySqlConnector
    config:
    database.allowPublicKeyRetrieval: "true"
    database.history.kafka.bootstrap.servers: fqcevent.servicebus.windows.net:9093
    database.history.kafka.topic: schema-changes.inventory
    database.hostname: #########
    database.password: #########
    database.port: "3306"
    database.server.id: "1"
    database.server.name: app
    database.user: root
    database.whitelist: inventory
    include.schema.changes: "true"
    tasksMax: 1
    status:
    conditions:
    - lastTransitionTime: "2022-06-15T08:18:18.031523Z"
    message: 'GET /connectors/inventory-connector/topics returned 404 (Not Found):
    Unexpected status code'
    reason: ConnectRestException
    status: "True"
    type: NotReady
    observedGeneration: 1
    tasksMax: 1
    topics: []
    Shantnu Jain
    @shantnuj_gitlab

    I have the following setup
    Oracle -> Kafka -> PostgreSQL
    Source Connector config is

    {
        "name":"myfirst-connector",
        "config":{
            "connector.class":"io.debezium.connector.oracle.OracleConnector",
            "tasks.max":"1",
            "database.hostname":"192.168.29.102",
            "database.port":"1521",
            "database.user":"c##dbzuser",
            "database.password":"dbz",
            "database.dbname":"ORCLCDB",
            "database.pdb.name":"ORCLPDB1",
            "database.server.name":"oracle19",
            "database.connection.adapter":"logminer",
            "database.history.kafka.topic":"schema_changes",
            "database.history.kafka.bootstrap.servers":"192.168.29.102:9092",
            "database.tablename.case.insensitive":"true",
            "snapshot.mode":"initial",
            "tombstones.on.delete":"true",
            "include.schema.changes": "true",
            "sanitize.field.names":"true",
            "key.converter":"org.apache.kafka.connect.json.JsonConverter",
            "key.converter.schemas.enable":"true",
            "value.converter":"org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable":"true",
            "time.precision.mode": "connect",
            "database.oracle.version":19
        } }

    Sink connector config is

    {
        "name": "myjdbc-sink-testdebezium",
        "config": {
            "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
            "tasks.max": "1",
            "topics.regex": "oracle19.C__DBZUSER.*",
            "connection.url": "jdbc:postgresql://192.168.29.102:5432/postgres?user=puser&password=My19cPassword",
            "key.converter":"org.apache.kafka.connect.json.JsonConverter",
            "key.converter.schemas.enable":"true",
            "value.converter":"org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable":"true",
            "dialect.name": "PostgreSqlDatabaseDialect",
            "auto.create": "true",
            "auto.evolve": "true",
            "insert.mode": "upsert",
            "delete.enabled": "true",
            "transforms": "unwrap, RemoveString, TimestampConverter",
            "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
            "transforms.unwrap.delete.handling.mode": "none",
            "transforms.RemoveString.type": "org.apache.kafka.connect.transforms.RegexRouter",
            "transforms.RemoveString.regex": "(.*)\\.C__DBZUSER\\.(.*)",
            "transforms.RemoveString.replacement": "$2",
            "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
            "transforms.TimestampConverter.target.type": "Timestamp",
            "transforms.TimestampConverter.field": "dob",
            "pk.mode": "record_key"
        }
    }

    Now when I drop a table in Oracle I get an entry in schema_changes topic but the table is not dropped from PostgreSQL. Need help in figuring out the issue why drop is not getting propogated. Just FYI, all the other operations i.e. Create Table, Alter Table, Insert, Update, Delete are working fine. Only DROP is not working and I am not getting any exception either.

    David Daniel Arch
    @daviddanielarch_twitter
    hi all, I've setup MSK Connect with Debezium but forgot to attach to the connector the configuration that adds the key.converter and value.converter configs set to JsonConverter. All the topics have already being populated with my Postgres data. Does anybody know if changing the converter nows means I need to sync Postgres with Kafka again?
    Daan Bosch
    @spinside007_gitlab
    Is there a way to use authentication when connecting to Postgres? Setting trust in pg_hba.conf is not secure according to this blog. https://medium.com/@lmramos.usa/debezium-cdc-postgres-c9ce4da05ce1
    Is there a way to make sure scram authentication is used? or md5?
    koevet
    @koevet:matrix.org
    [m]

    hello, I'm trying to setup Debezium with Oracle and I'm following this guide: https://github.com/debezium/debezium-examples/tree/main/tutorial#using-oracle

    I have the various components running (Oracle, Kafka, etc) and when I try to register the Debezium Oracle connector I get this error:

    {"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nUnable to connect: Failed to resolve Oracle database version\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}
    I can't find anything wrong in the configuration and I can connect to the dockerized Oracle on localhost:1521 without problems
    for reference this is the config that I'm pushing to the connector:
    {
      "name": "inventory-connector",
      "config": {
        "connector.class": "io.debezium.connector.oracle.OracleConnector",
        "tasks.max": "1",
        "database.server.name": "server1",
        "database.hostname": "localhost",
        "database.port": "1521",
        "database.user": "c##dbzuser",
        "database.password": "dbz",
        "database.dbname": "ORCLCDB",
        "database.pdb.name": "ORCLPDB1",
        "database.connection.adapter": "logminer",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory"
      }
    }
    1 reply
    finally, this is the stack trace from the connector: https://gist.github.com/luciano-fiandesio/0a78e6b2d3aeb9e1b56269eb2ad4715d
    I have setup debezium before on Postgres and I didn't have a single problem, any idea wher ethe problem may be?
    koevet
    @koevet:matrix.org
    [m]
    :point_up: Edit: I have setup debezium before on Postgres and I didn't have a single problem, any idea where the problem may be?

    :point_up: Edit: hello, I'm trying to setup Debezium with Oracle and I'm following this guide: https://github.com/debezium/debezium-examples/tree/main/tutorial#using-oracle

    I have the various components running (Oracle, Kafka, etc) and when I try to register the Debezium Oracle connector I get this error:

    {"error_code":400,"message":\
    "Connector configuration is invalid and contains the following 1 error(s):\nUnable to connect: Failed to resolve Oracle database version\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}

    :point_up: Edit: hello, I'm trying to setup Debezium with Oracle and I'm following this guide: https://github.com/debezium/debezium-examples/tree/main/tutorial#using-oracle

    I have the various components running (Oracle, Kafka, etc) and when I try to register the Debezium Oracle connector I get this error:

    {"error_code":400,"message":
    "Connector configuration is invalid and contains the following 1 error(s):\nUnable to connect: Failed to resolve Oracle database version\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}
    leexuGit
    @leexuGit
    Does CDC support Oracle 10G?
    Butter Ngo
    @butterngo
    Hi team now i'm getting a problem with Message Filltering "Validation error: transforms.filter.type - Invalid value io.
    debezium.transforms.Filter for configuration transforms.filt
    er.type: Class io.debezium.transforms.Filter could not be fo
    und." i added plugins follow documentation but any missing configuration https://debezium.io/documentation/reference/1.3/configuration/filtering.html
    so please help me
    Marcos
    @mmaia

    I have a connector that finishes the snapshot phase but then fails every time, looking for issue for a couple of hours, can't find it, any ideas:

    connect  | [2022-07-10 13:19:44,310] INFO Snapshot ended with SnapshotResult [status=COMPLETED, offset=MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.000003, currentBinlogPosition=197, currentRowNumber=0, serverId=0, sourceTime=2022-07-10T11:19:44.308Z, threadId=-1, currentQuery=null, tableIds=[userTransactions.EUR], databaseName=userTransactions], snapshotCompleted=true, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=1d8b8ba9-0042-11ed-b7a0-0242c0a89006:1-114, currentGtidSet=1d8b8ba9-0042-11ed-b7a0-0242c0a89006:1-114, restartBinlogFilename=mysql-bin.000003, restartBinlogPosition=197, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]] (io.debezium.pipeline.ChangeEventSourceCoordinator)
    connect  | [2022-07-10 13:19:44,310] INFO Requested thread factory for connector MySqlConnector, id = userTransactions named = binlog-client (io.debezium.util.Threads)
    connect  | [2022-07-10 13:19:44,317] INFO Requested thread factory for connector MySqlConnector, id = userTransactions named = kafka-signal (io.debezium.util.Threads)
    connect  | [2022-07-10 13:19:44,317] ERROR Producer failure (io.debezium.pipeline.ErrorHandler)
    connect  | java.lang.NullPointerException
    connect  |     at java.base/java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1011)
    connect  |     at java.base/java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006)
    connect  |     at java.base/java.util.Properties.put(Properties.java:1340)
    connect  |     at java.base/java.util.Properties.setProperty(Properties.java:228)
    connect  |     at io.debezium.config.Configuration$Builder.withDefault(Configuration.java:687)
    connect  |     at io.debezium.connector.mysql.signal.KafkaSignalThread.<init>(KafkaSignalThread.java:101)
    connect  |     at io.debezium.connector.mysql.MySqlReadOnlyIncrementalSnapshotChangeEventSource.<init>(MySqlReadOnlyIncrementalSnapshotChangeEventSource.java:91)
    connect  |     at io.debezium.connector.mysql.MySqlChangeEventSourceFactory.getIncrementalSnapshotChangeEventSource(MySqlChangeEventSourceFactory.java:91)
    ...

    and then:

    connect  | [2022-07-10 13:19:45,074] ERROR WorkerSourceTask{id=Wallets-Debezium-Mysql-UserTransactions-V2-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
    connect  | org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
    connect  |     at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50)
    connect  |     at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:116)
    connect  |     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    connect  |     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    connect  |     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    connect  |     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    connect  |     at java.base/java.lang.Thread.run(Thread.java:829)
    connect  | Caused by: java.lang.NullPointerException
    connect  |     at java.base/java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1011)
    connect  |     at java.base/java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006)
    connect  |     at java.base/java.util.Properties.put(Properties.java:1340)
    connect  |     at java.base/java.util.Properties.setProperty(Proper
    Naveenkumar seerangan
    @naveenseerangan:matrix.org
    [m]
    Hi,
    Yarin Benado
    @yarinb
    Is there any way to remove a table from an "in-progress" incremental snapshot? It's a huge table and we'd like to stop the snapshot process for this specific table.
    ajinkya-centime
    @ajinkya-centime
    Im trying to reRoute the Debezium message from all different topics to topic "cdc_event" with following config: {
    "name": "centime-debezium-connector",
    "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "{{db_endpoint}}",
    "database.port": "9909",
    "database.user": "{{user}}",
    "database.password": "{{pwd}}",
    "database.server.id": "7799838800",
    "database.server.name": "dev_db_local_test",
    "database.history.kafka.bootstrap.servers": "localhost:9092",
    "database.history.kafka.topic": "dbhistory",
    "table.include.list": "CENTIME_GL_DB.bill_payment_schedule_cdc,CENTIME_PAYMENT_DB.payments_cdc",
    "event.processing.failure.handling.mode": "fail",
    "include.schema.changes": "true",
    "time.precision.mode": "connect",
    "decimal.handling.mode": "double",
    "transforms": "reRoute",
    "transforms.reRoute.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.reRoute.regex": "^.(cdc|bill|payments).$",
    "transforms.reRoute.replacement": "cdc_events"
    }
    } , whenever I register debezium connector on kafka connect I get following error: [2022-07-26 12:21:22,403] INFO [centime-debezium-connector|task-0] EnrichedConnectorConfig values:
    config.action.reload = restart
    connector.class = io.debezium.connector.mysql.MySqlConnector
    errors.log.enable = false
    errors.log.include.messages = false
    errors.retry.delay.max.ms = 60000
    errors.retry.timeout = 0
    errors.tolerance = none
    header.converter = null
    key.converter = null
    name = centime-debezium-connector
    predicates = []
    tasks.max = 1
    topic.creation.groups = []
    transforms = [reRoute]
    transforms.reRoute.key.enforce.uniqueness = true
    transforms.reRoute.key.field.regex = null
    transforms.reRoute.key.field.replacement = null
    transforms.reRoute.negate = false
    transforms.reRoute.predicate =
    transforms.reRoute.topic.regex = null
    transforms.reRoute.topic.replacement = null
    transforms.reRoute.type = class io.debezium.transforms.ByLogicalTableRouter
    value.converter = null
    (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:376)
    [2022-07-26 12:21:22,404] ERROR [centime-debezium-connector|task-0] The 'topic.regex' value is invalid: A value is required (io.debezium.transforms.ByLogicalTableRouter:1881)
    [2022-07-26 12:21:22,404] ERROR [centime-debezium-connector|task-0] The 'topic.replacement' value is invalid: A value is required (io.debezium.transforms.ByLogicalTableRouter:1881)
    [2022-07-26 12:21:22,405] ERROR [centime-debezium-connector|task-0] Failed to start task centime-debezium-connector-0 (org.apache.kafka.connect.runtime.Worker:550)
    org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.connect.errors.ConnectException: Unable to validate config.
    at org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:295)
    at org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:593)
    at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:545)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1421)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$22(DistributedHerder.java:1434)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.kafka.connect.errors.ConnectException: Unable to validate config.
    at io.debezium.transforms.ByLogicalTableRouter.configure(ByLogicalTableRouter.java:180)
    at org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:284)
    ... 8 more
    From the logs, I see that even if Im passing topic regex pattern and replacement, it is showing as null while printing the config
    can anybody help ,what is wrong with config
    JFercan
    @JFercan
    Will a connector keep the database connection open even when both Connector and Tasks are paused? This is for a connection to MS SQL. Thank you.
    Suryaprakash S
    @suryaares
    I am getting this error while streaming data from debezium server to redis in two seperate ec2 instances.
    2022-08-03 11:27:47,711 ERROR [io.qua.run.Application] (main) Failed to start application (with profile prod): java.net.ConnectException: Connection refused (Connection refused)
    at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
    at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
    at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
    at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.base/java.net.Socket.connect(Socket.java:609)
    at redis.clients.jedis.DefaultJedisSocketFactory.createSocket(DefaultJedisSocketFactory.java:53)
    at redis.clients.jedis.Connection.connect(Connection.java:163)
    at redis.clients.jedis.BinaryClient.connect(BinaryClient.java:113)
    at redis.clients.jedis.Connection.sendCommand(Connection.java:119)
    at redis.clients.jedis.Connection.sendCommand(Connection.java:110)
    at redis.clients.jedis.BinaryClient.auth(BinaryClient.java:649)
    at redis.clients.jedis.BinaryJedis.auth(BinaryJedis.java:2416)
    at io.debezium.server.redis.RedisStreamChangeConsumer.connect(RedisStreamChangeConsumer.java:85)
    at io.debezium.server.redis.RedisStreamChangeConsumer_Bean.create(RedisStreamChangeConsumer_Bean.zig:704)
    at io.debezium.server.redis.RedisStreamChangeConsumer_Bean.create(RedisStreamChangeConsumer_Bean.zig:720)
    at io.debezium.server.DebeziumServer.start(DebeziumServer.java:115)
    at io.debezium.server.DebeziumServer_Bean.create(DebeziumServer_Bean.zig:256)
    at io.debezium.server.DebeziumServer_Bean.create(DebeziumServer_Bean.zig:272)
    at io.quarkus.arc.impl.AbstractSharedContext.createInstanceHandle(AbstractSharedContext.java:96)
    at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:29)
    at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:26)
    at io.quarkus.arc.impl.LazyValue.get(LazyValue.java:26)
    at io.quarkus.arc.impl.ComputingCache.computeIfAbsent(ComputingCache.java:69)
    at io.quarkus.arc.impl.AbstractSharedContext.get(AbstractSharedContext.java:26)
    at io.quarkus.arc.impl.ClientProxies.getApplicationScopedDelegate(ClientProxies.java:17)
    at io.debezium.server.DebeziumServer_ClientProxy.arc$delegate(DebeziumServer_ClientProxy.zig:67)
    at io.debezium.server.DebeziumServer_ClientProxy.arc_contextualInstance(DebeziumServer_ClientProxy.zig:82)
    at io.debezium.server.DebeziumServer_Observer_Synthetic_d70cd75bf32ab6598217b9a64a8473d65e248c05.notify(DebeziumServer_Observer_Synthetic_d70cd75bf32ab6598217b9a64a8473d65e248c05.zig:94)
    at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:300)
    at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:282)
    at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:70)
    at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:128)
    at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:97)
    at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy_0(LifecycleEventsBuildStep$startupEvent1144526294.zig:87)
    at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy(LifecycleEventsBuildStep$startupEvent1144526294.zig:40)
    at io.quarkus.runner.ApplicationImpl.doStart(ApplicationImpl.zig:623)
    at io.quarkus.runtime.Application.start(Application.java:101)
    at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:101)
    at io.quarkus.runtime.Quarkus.run(Quarkus.java:66)
    at io.quarkus.runtime.Quarkus.run(Quarkus.java:42)
    at io.quarkus.runtime.Quarkus.run(Quarkus.java:119)
    at io.debezium.server.Main.main(Main.java:15)
    Romil Punetha
    @romilpunetha

    I'm trying to use MongoDB outbox pattern with the payload as byte[]. I'm using the quarkus-avro library which converts my userEvent.avsc file into a UserEvent.java class that has an inbuild encoder which returns the byte[] for the class. I use a mapper to convert my original User.java instance into the UserEvent.java instance , and then i convert this into a byte[] and store it as a payload in the outbox table for Mongodb.

    When I use debezium with io.debezium.converters.ByteArrayConverter value converter I get an alphanumeric string in kafka. When my consumer tries to consume this as a byte[], it throws an error like unidentified hex value.
    I have other consumers that read from the kafka topic and have configs that accept avro schema and byte[] and create the java instance out of it. However, the byte sequence seems to differ after being consumed by mongodb as when I print the encoded byte[] vs the one that I read from kafka, they are different. Any idea what to do here? The idea is that the avro byte[] should be passed as-is to kafka, which can be read by consumers in a decode function.

    1 reply
    wangshuang0220
    @wangshuang0220
    hi all, did you met this error: Caused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'jdbc-sink7' is configured with 'delete.enabled=false' and 'pk.mode=none' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='t2',partition=0,offset=7,timestamp=1660188308616) with a null value and null value schema.
    wangshuang0220
    @wangshuang0220
    confluent sink jdbc connect can sync mysql function and producure?
    sharonmary
    @sharonmary:matrix.org
    [m]
    Good news
    Put an end to your financial problems,now you can start earning with your PC or mobile phone, PAY INSTANTLY!
    So far I’ve made over €5,500 in last 12days of doing this!(HOW) info inbox me the directly Telegram
    👇👇
    https://t.me/+klV1ANnp3q02YjU0
    kr1929uti
    @kr1929uti
    Hi everyone, I have a kafka setup with postgres as my source and sink. I am trying to implement a scenario where any ddl changes in postgres source connector (such as column addition, deletion, update on column, column type change) should reflect in my sink postgres table. I already have auto.evolve=true in my sink connector configuration (using JDBC sink connector), but it is not fulfilling the requirements. Any suggestion on this?
    kr1929uti
    @kr1929uti
    Hi all,
    Whenever I encounter schema changes (ddl changes coming in), I want to automate the table backup and table deletion and then table creation with the new schema. (I am using postgres db and I have kafka setup). Any suggestions on how to go about automation?
    rishimathur14
    @rishimathur14
    Hi Team we are facing below error frequent event though we have set retention.ms 31 year and clean.policy to delete but we get error ash-4.2$ curl -X GET -H "Accept:application/json" http://kafka-connect-service:8083/connectors/db2-connect/status
    {"name":"db2-connect-rdcdev-client-wsdiwwe","connector":{"state":"RUNNING","worker_id":"XXXX:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"XXXX:8083","trace":"io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.\n\tat io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:47)\n\tat io.debezium.connector.db2.Db2ConnectorTask.start(Db2ConnectorTask.java:88)\n\tat
    andersonwatts ✪
    @andersonwatts7:matrix.org
    [m]

    "I'll help 10individuals how to earn $30,000 in 72 hours from the crypto market. But you pay me 10% commission when you receive your profit. if interested send me a direct message on WhatsApp by asking me (HOW) for more details on how to get started

    ‪+1 (559) 666‑3967‬

    https://t.me/+JVp6bEZDk6o1NDA0

    salaei
    @salaei:matrix.org
    [m]
    Hi Everyone, We are using Debezium connector to replicate data from Postgres into Kafka, the connector fails because the connector tries to create a new pg slot with the same name while the slot exists and is inactive. The exact error is this
    "org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.\n\tat io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)\n\tat io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:172)\n\tat io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:41)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:172)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:139)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:108)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: io.debezium.DebeziumException: Failed to start replication stream at LSN{12D1/80663BD0}; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each.\n\tat io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startStreaming(PostgresReplicationConnection.java:309)\n\tat io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:129)\n\t... 9 more\nCaused by: org.postgresql.util.PSQLException: ERROR: replication slot \"revmgmt_debezium_1\" is active for PID 30294\n\tat org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2675)\n\tat org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1263)\n\tat org.postgresql.core.v3.QueryExecutorImpl.startCopy(QueryExecutorImpl.java:945)\n\tat org.postgresql.core.v3.replication.V3ReplicationProtocol.initializeReplication(V3ReplicationProtocol.java:60)\n\tat org.postgresql.core.v3.replication.V3ReplicationProtocol.startLogical(V3ReplicationProtocol.java:44)\n\tat org.postgresql.replication.fluent.ReplicationStreamBuilder$1.start(ReplicationStreamBuilder.java:38)\n\tat org.postgresql.replication.fluent.logical.LogicalStreamBuilder.start(LogicalStreamBuilder.java:41)\n\tat io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startPgReplicationStream(PostgresReplicationConnection.java:580)\n\tat io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationStream(PostgresReplicationConnection.java:414)\n\tat io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startStreaming(PostgresReplicationConnection.java:301)\n\t... 10 more\n"
    does anyone know why this is happening?
    salaei
    @salaei:matrix.org
    [m]
    @Naros
    salaei
    @salaei:matrix.org
    [m]
    @jpechane:
    Armand Eidi
    @arixooo:matrix.org
    [m]
    Hi Everyone, I am using Debezium server on my laptop to see if it's possible to load cdc data from GCP Cloud SQL Server into PubSub and then BigQuery.
    I am getting this error , the resource that says is not found is the instance name in GCP . the parameter in application.properties is debezium.source.database.server.name=ipm-test
    any idea?
    Hung Truong
    @tmhung84_gitlab

    Hi, please help me, I get this error while using Docker image debezium/server:latest to CDC my Oracle database:

    {
      "exception": {
        "refId": 1,
        "exceptionType": "java.lang.ClassNotFoundException",
        "message": "io.debezium.connector.oracle.OracleConnector"
      }
    }

    My application.properties:

    debezium.sink.type=redis
    debezium.sink.redis.address=redis:6379
    debezium.sink.redis.batch.size=10
    debezium.source.connector.class=io.debezium.connector.oracle.OracleConnector
    debezium.source.offset.storage.file.filename=data/offsets.dat
    debezium.source.offset.flush.interval.ms=0
    debezium.source.database.server.name=server1
    debezium.source.database.hostname=10.11.12.12
    debezium.source.database.port=1521
    debezium.source.database.user=dbuser
    debezium.source.database.password=dbP@ssw0rd
    debezium.source.database.dbname=BON
    debezium.source.database.out.server.name=dbzxout
    debezium.source.database.connection.adapter=logminer
    debezium.source.database.tablename.case.insensitive=true
    debezium.source.table.include.list=lookup.branch_test_limited,lookup.branch_test,pick.pick_test
    debezium.source.database.tablename.case.insensitive=true
    debezium.source.database.oracle.version=12+
    debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
    quarkus.log.console.json=true
    Hung Truong
    @tmhung84_gitlab
    I downloaded jars from Oracle Connector tutorial and added to CLASSPATH but still face this issue.
    Hung Truong
    @tmhung84_gitlab

    I solved by mount jars from Oracle Connector tutorial to /debezium/lib inside Docker container. But now I get error:

    "exception": {
        "refId": 1,
        "exceptionType": "java.lang.NoSuchFieldError",
        "message": "INTERNAL_CONNECTOR_CLASS",
        "frames": [
          {
            "class": "io.debezium.storage.kafka.history.KafkaDatabaseHistory",
            "method": "<clinit>",
            "line": 169
          },
          {
            "class": "io.debezium.storage.kafka.history.KafkaStorageConfiguration",
            "method": "validateServerNameIsDifferentFromHistoryTopicName",
            "line": 17
          },
          {
            "class": "io.debezium.config.Field$Validator",
            "method": "lambda$and$0",
            "line": 232
          },
          {
            "class": "io.debezium.config.Field",
            "method": "validate",
            "line": 640
          },
          {
            "class": "io.debezium.config.Configuration",
            "method": "validate",
            "line": 1863
          },
          {
            "class": "io.debezium.config.Configuration",
            "method": "validateAndRecord",
            "line": 1879
          },
          {
            "class": "io.debezium.connector.common.BaseSourceTask",
            "method": "start",
            "line": 119
          },
          {
            "class": "io.debezium.embedded.EmbeddedEngine",
            "method": "run",
            "line": 759
          },
          {
            "class": "io.debezium.embedded.ConvertingEngineBuilder$2",
            "method": "run",
            "line": 192
          },
          {
            "class": "io.debezium.server.DebeziumServer",
            "method": "lambda$start$1",
            "line": 150
          },
          {
            "class": "java.util.concurrent.ThreadPoolExecutor",
            "method": "runWorker",
            "line": 1128
          },
          {
            "class": "java.util.concurrent.ThreadPoolExecutor$Worker",
            "method": "run",
            "line": 628
          },
          { "class": "java.lang.Thread", "method": "run", "line": 829 }
        ]
      }

    Even though I set debezium.source.database.history=io.debezium.server.redis.RedisDatabaseHistory.
    Message from logs:

    "Connector completed: success = 'false', message = 'Unable to initialize and start connector's task class 'io.debezium.connector.oracle.OracleConnectorTask' with config: {connector.class=io.debezium.connector.oracle.OracleConnector, debezium.sink.redis.batch.size=10, database.history.redis.address=redis:6379, database.tablename.case.insensitive=true, database.history.redis.ssl.enabled=false, offset.storage.file.filename=data/offsets.dat, database.out.server.name=dbzxout, database.oracle.version=11, value.converter=org.apache.kafka.connect.json.JsonConverter, key.converter=org.apache.kafka.connect.json.JsonConverter, database.user=XXXXUSER1, database.dbname=BON, offset.storage=io.debezium.server.redis.RedisOffsetBackingStore, debezium.sink.type=redis, debezium.sink.redis.address=redis:6379, database.connection.adapter=logminer, database.server.name=server1, offset.flush.timeout.ms=5000, database.port=1521, offset.flush.interval.ms=0, internal.key.converter=org.apache.kafka.connect.json.JsonConverter, database.hostname=10.11.12.12, database.password=********, name=redis, internal.value.converter=org.apache.kafka.connect.json.JsonConverter, table.include.list=lookup.branch_test_limited,lookup.branch_test,pick.pick_test, database.history=io.debezium.server.redis.RedisDatabaseHistory}', error = '{}'".

    There is a line database.history=io.debezium.server.redis.RedisDatabaseHistory meaning my config is loaded.

    Hung Truong
    @tmhung84_gitlab
    After reading source code of Debezium, maybe my problem is caused by missing debezium.source.database.history.connector.class
    Tomoya Deng
    @tomoyadeng
    use oracle source connector, when produce 1000 sql per second, some data loss, loss about 1~2 item in 10 minutes。
    Tomoya Deng
    @tomoyadeng
    Is anyone know what happen? miss some important configs?
    alexchole
    @alexchole:matrix.org
    [m]
    "I'll help 10individuals how to earn $30,000 in 72 hours from the crypto market. But you will pay me 10% commission when you receive your profit. if interested send me a direct message on Telegram by asking me (HOW) for more details on how to get started
    https://t.me/+lD4Ec_gRjCljYjNk
    White_Hat_Hacker
    @white_hat_hacker:minds.com
    [m]
    I am looking for individuals who can create a wallet and are ready to receive large amount of bitcoin or ethereum.
    There are millions of bitcoin and ethereum which are developed from my mining rig from the blockchain server. I only accept payment after the work and my payment is 30% of the mined bitcoins funds after you sell them .
    nshah99
    @nshah99
    Hi folks. Does anyone here know if we can use debezium with AWS Aurora multi-master MySQL setup. On AWSs website, it lists that a multi-master setup does not binlog replication and hence no GTID replication so I am wondering if debezium can work in the absence of binlog replication.
    Tanay Karmarkar
    @_codeplumber_twitter
    Hello all,
    Getting a really slow performance on the incremental snapshot with Debezium. I am publishing it to a topic of 3 partitions with a chunk size of 10000. The performance I am getting is close to 85 events per second! I am using avro serialization and de serialization. Should I try increasing batch size even further or increasing partitioning for kafka topic? Every couple of seconds, I see 2048 events flushed but rest of the time it’s mostly flushing 0 outstanding messages.
    Sorry, forgot to mention, I am using the postgres connector.