Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
    Paulo
    @paulossjunior
    Hello, I would like to use debezium with apache beam and postgressql. Someone knows about some tutorial about it
    ?
    sujithramanathan
    @sujithramanathan
    Hi Team,
    We are using DB2 debezium and we have a clob column. While streaming the records from DB to Kafka topic we are getting com.ibm.db2.jcc.am.c_@2d336dda. Can anyone help me to resolve this.
    sujithramanathan
    @sujithramanathan

    Hi Team,
    We are using DB2 debezium and we have a clob column. While streaming the records from DB to Kafka topic we are getting com.ibm.db2.jcc.am.c_@2d336dda. Can anyone help me to resolve this.

    Can anyone please help me on this. Was stucked here

    SchliffMachine
    @SchliffMachine_twitter

    Good afternoon Team!

    At the moment we are using Debezium MySql against Aurora.
    GTID mode in MySql is set to OFF_PERMISSIVE, so considered enabled as of this check in MySqlConnection#isGtidModeEnabled:
    return !"OFF".equalsIgnoreCase(rs.getString(2)); - so it considers anything not OFF to be ON.

    Upon restarting the process we eventually end up with the error:
    The replication sender thread cannot start in AUTO_POSITION mode: this server has GTID_MODE = OFF_PERMISSIVE instead of ON

    In other words - Debezium considers OFF_PERMISSIVE as enabled, but in order to progress it eventually (transitively, as the error comes from some sql client library) needs this to be ON.

    So basically (at least from what I can tell) we can't move forward from this point without changing the configuration on Aurora.
    If I'm wrong on this - please correct me, would be happy!

    I believe this situation could be omitted if the usage of GTID at all was configurable on the Debezium side (as it's an optimization after all, not something required for Debezium to work).

    Have anyone encountered this issue before?

    ms-oyo
    @ms-oyo

    Hi, Can anyone share how does debezium maintain Postgres connection?

    The documention shares the CDC logic, whereas I want to know how the connection b/w connector and Postgres host are established.

    More in lines of is it a pool connection - does the connector use a heartbeat signal to determine if the RDS is up?

    Aravindan C
    @aravindan.ck_gitlab
    Hi all, I'm using Debezium server to capture changes from PostgreSQL.
    Debezium uses PostgreSQL's logical replication slot to capture changes which already remembers the LSN until which the connector has replicated.
    Is it mandatory to have a FileOffsetBackingStore to record the connector offsets? Is there a way to skip it and just rely on PostgreSQL's data?
    1 reply
    Ghost
    @ghost~628efad26da037398497505c
    Hi, Can I listen and get mysql views data through debezium?
    1 reply
    Luan Chanh Tran
    @luantran1995
    Hello all, could you help me clearly with this part https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-connector-is-stopped-for-a-duration?
    In case I stopped the debezium and insert some data in Postgres SQL after that I restart it but at that time I did not see debezium catch up data change sent to Kafka . Is there any missing config in this code below:
    @Bean
    public io.debezium.config.Configuration postgresConnector() throws IOException {
    File offsetStorageTempFile = File.createTempFile("offsets", ".dat");
    File dbHistoryTempFile = File.createTempFile("dbhistory
    ", ".dat");
    return io.debezium.config.Configuration.create()
    .with("name", "pg-connector")
    .with("connector.class","io.debezium.connector.postgresql.PostgresConnector")
    .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
    .with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
    .with("offset.flush.interval.ms", "0")
    .with("database.hostname", postgresDbHost)
    .with("database.port", postgresDbPort)
    .with("database.user", postgresDbUsername)
    .with("database.password", postgresDbPassword)
    .with("database.dbname", postgresDbName)
    .with("database.include.list", postgresDbName)
    .with("database.server.name", "PostgreSQL")
    .with("plugin.name", "pgoutput")
    .with("table.whitelist", "public.t_se_interface,public.g_individu")
    .with("snapshot.mode","initial")
    .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
    .with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath())
    .build();
    }
    sribarrow
    @sribarrow
    {
    "name": "emp-connector",
    "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname" : "emp",
    "database.server.name": "localhost",
    "database.whitelist": "emp",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.emp"
    }'
    {"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nError while validating connector config: The connection attempt failed.\nYou can also find the above list of errors at the endpoint /connector-plugins/{connectorType}/config/validate"}
    I am trying to follow this: https://hevodata.com/learn/connecting-kafka-to-postgresql/#m1 and i get issue when i try to start the connector
    I had a few issues with the postgres where i had to setup the env variable. I have even reset the postgres password. just not sure what can be changed. should i use ip instead of localhost?
    sivaaspire
    @sivaaspire:matrix.org
    [m]
    azureuser@app:~/strimzi-kafka-operator/templates$ kubectl get kctr inventory-connector -o yaml
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
    annotations:
    kubectl.kubernetes.io/last-applied-configuration: |
    {"apiVersion":"kafka.strimzi.io/v1beta2","kind":"KafkaConnector","metadata":{"annotations":{},"labels":{"strimzi.io/cluster":"my-connect-cluster-debezium"},"name":"inventory-connector","namespace":"default"},"spec":{"class":"io.debezium.connector.mysql.MySqlConnector","config":{"database.allowPublicKeyRetrieval":"true","database.history.kafka.bootstrap.servers":"fqcevent.servicebus.windows.net:9093","database.history.kafka.topic":"schema-changes.inventory","database.hostname":"#####","database.password":"######","database.port":"3306","database.server.id":"1","database.server.name":"app","database.user":"root","database.whitelist":"inventory","include.schema.changes":"true"},"tasksMax":1}}
    creationTimestamp: "2022-06-15T08:18:16Z"
    generation: 1
    labels:
    strimzi.io/cluster: my-connect-cluster-debezium
    name: inventory-connector
    namespace: default
    resourceVersion: "627127"
    uid: f416dbd4-6781-48f4-8b4d-6f4ae6c0d439
    spec:
    class: io.debezium.connector.mysql.MySqlConnector
    config:
    database.allowPublicKeyRetrieval: "true"
    database.history.kafka.bootstrap.servers: fqcevent.servicebus.windows.net:9093
    database.history.kafka.topic: schema-changes.inventory
    database.hostname: #########
    database.password: #########
    database.port: "3306"
    database.server.id: "1"
    database.server.name: app
    database.user: root
    database.whitelist: inventory
    include.schema.changes: "true"
    tasksMax: 1
    status:
    conditions:
    - lastTransitionTime: "2022-06-15T08:18:18.031523Z"
    message: 'GET /connectors/inventory-connector/topics returned 404 (Not Found):
    Unexpected status code'
    reason: ConnectRestException
    status: "True"
    type: NotReady
    observedGeneration: 1
    tasksMax: 1
    topics: []
    Shantnu Jain
    @shantnuj_gitlab

    I have the following setup
    Oracle -> Kafka -> PostgreSQL
    Source Connector config is

    {
        "name":"myfirst-connector",
        "config":{
            "connector.class":"io.debezium.connector.oracle.OracleConnector",
            "tasks.max":"1",
            "database.hostname":"192.168.29.102",
            "database.port":"1521",
            "database.user":"c##dbzuser",
            "database.password":"dbz",
            "database.dbname":"ORCLCDB",
            "database.pdb.name":"ORCLPDB1",
            "database.server.name":"oracle19",
            "database.connection.adapter":"logminer",
            "database.history.kafka.topic":"schema_changes",
            "database.history.kafka.bootstrap.servers":"192.168.29.102:9092",
            "database.tablename.case.insensitive":"true",
            "snapshot.mode":"initial",
            "tombstones.on.delete":"true",
            "include.schema.changes": "true",
            "sanitize.field.names":"true",
            "key.converter":"org.apache.kafka.connect.json.JsonConverter",
            "key.converter.schemas.enable":"true",
            "value.converter":"org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable":"true",
            "time.precision.mode": "connect",
            "database.oracle.version":19
        } }

    Sink connector config is

    {
        "name": "myjdbc-sink-testdebezium",
        "config": {
            "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
            "tasks.max": "1",
            "topics.regex": "oracle19.C__DBZUSER.*",
            "connection.url": "jdbc:postgresql://192.168.29.102:5432/postgres?user=puser&password=My19cPassword",
            "key.converter":"org.apache.kafka.connect.json.JsonConverter",
            "key.converter.schemas.enable":"true",
            "value.converter":"org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable":"true",
            "dialect.name": "PostgreSqlDatabaseDialect",
            "auto.create": "true",
            "auto.evolve": "true",
            "insert.mode": "upsert",
            "delete.enabled": "true",
            "transforms": "unwrap, RemoveString, TimestampConverter",
            "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
            "transforms.unwrap.delete.handling.mode": "none",
            "transforms.RemoveString.type": "org.apache.kafka.connect.transforms.RegexRouter",
            "transforms.RemoveString.regex": "(.*)\\.C__DBZUSER\\.(.*)",
            "transforms.RemoveString.replacement": "$2",
            "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
            "transforms.TimestampConverter.target.type": "Timestamp",
            "transforms.TimestampConverter.field": "dob",
            "pk.mode": "record_key"
        }
    }

    Now when I drop a table in Oracle I get an entry in schema_changes topic but the table is not dropped from PostgreSQL. Need help in figuring out the issue why drop is not getting propogated. Just FYI, all the other operations i.e. Create Table, Alter Table, Insert, Update, Delete are working fine. Only DROP is not working and I am not getting any exception either.

    David Daniel Arch
    @daviddanielarch_twitter
    hi all, I've setup MSK Connect with Debezium but forgot to attach to the connector the configuration that adds the key.converter and value.converter configs set to JsonConverter. All the topics have already being populated with my Postgres data. Does anybody know if changing the converter nows means I need to sync Postgres with Kafka again?
    Daan Bosch
    @spinside007_gitlab
    Is there a way to use authentication when connecting to Postgres? Setting trust in pg_hba.conf is not secure according to this blog. https://medium.com/@lmramos.usa/debezium-cdc-postgres-c9ce4da05ce1
    Is there a way to make sure scram authentication is used? or md5?
    koevet
    @koevet:matrix.org
    [m]

    hello, I'm trying to setup Debezium with Oracle and I'm following this guide: https://github.com/debezium/debezium-examples/tree/main/tutorial#using-oracle

    I have the various components running (Oracle, Kafka, etc) and when I try to register the Debezium Oracle connector I get this error:

    {"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nUnable to connect: Failed to resolve Oracle database version\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}
    I can't find anything wrong in the configuration and I can connect to the dockerized Oracle on localhost:1521 without problems
    for reference this is the config that I'm pushing to the connector:
    {
      "name": "inventory-connector",
      "config": {
        "connector.class": "io.debezium.connector.oracle.OracleConnector",
        "tasks.max": "1",
        "database.server.name": "server1",
        "database.hostname": "localhost",
        "database.port": "1521",
        "database.user": "c##dbzuser",
        "database.password": "dbz",
        "database.dbname": "ORCLCDB",
        "database.pdb.name": "ORCLPDB1",
        "database.connection.adapter": "logminer",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory"
      }
    }
    1 reply
    finally, this is the stack trace from the connector: https://gist.github.com/luciano-fiandesio/0a78e6b2d3aeb9e1b56269eb2ad4715d
    I have setup debezium before on Postgres and I didn't have a single problem, any idea wher ethe problem may be?
    koevet
    @koevet:matrix.org
    [m]
    :point_up: Edit: I have setup debezium before on Postgres and I didn't have a single problem, any idea where the problem may be?

    :point_up: Edit: hello, I'm trying to setup Debezium with Oracle and I'm following this guide: https://github.com/debezium/debezium-examples/tree/main/tutorial#using-oracle

    I have the various components running (Oracle, Kafka, etc) and when I try to register the Debezium Oracle connector I get this error:

    {"error_code":400,"message":\
    "Connector configuration is invalid and contains the following 1 error(s):\nUnable to connect: Failed to resolve Oracle database version\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}

    :point_up: Edit: hello, I'm trying to setup Debezium with Oracle and I'm following this guide: https://github.com/debezium/debezium-examples/tree/main/tutorial#using-oracle

    I have the various components running (Oracle, Kafka, etc) and when I try to register the Debezium Oracle connector I get this error:

    {"error_code":400,"message":
    "Connector configuration is invalid and contains the following 1 error(s):\nUnable to connect: Failed to resolve Oracle database version\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}
    leexuGit
    @leexuGit
    Does CDC support Oracle 10G?
    Butter Ngo
    @butterngo
    Hi team now i'm getting a problem with Message Filltering "Validation error: transforms.filter.type - Invalid value io.
    debezium.transforms.Filter for configuration transforms.filt
    er.type: Class io.debezium.transforms.Filter could not be fo
    und." i added plugins follow documentation but any missing configuration https://debezium.io/documentation/reference/1.3/configuration/filtering.html
    so please help me
    Marcos
    @mmaia

    I have a connector that finishes the snapshot phase but then fails every time, looking for issue for a couple of hours, can't find it, any ideas:

    connect  | [2022-07-10 13:19:44,310] INFO Snapshot ended with SnapshotResult [status=COMPLETED, offset=MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.000003, currentBinlogPosition=197, currentRowNumber=0, serverId=0, sourceTime=2022-07-10T11:19:44.308Z, threadId=-1, currentQuery=null, tableIds=[userTransactions.EUR], databaseName=userTransactions], snapshotCompleted=true, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=1d8b8ba9-0042-11ed-b7a0-0242c0a89006:1-114, currentGtidSet=1d8b8ba9-0042-11ed-b7a0-0242c0a89006:1-114, restartBinlogFilename=mysql-bin.000003, restartBinlogPosition=197, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]] (io.debezium.pipeline.ChangeEventSourceCoordinator)
    connect  | [2022-07-10 13:19:44,310] INFO Requested thread factory for connector MySqlConnector, id = userTransactions named = binlog-client (io.debezium.util.Threads)
    connect  | [2022-07-10 13:19:44,317] INFO Requested thread factory for connector MySqlConnector, id = userTransactions named = kafka-signal (io.debezium.util.Threads)
    connect  | [2022-07-10 13:19:44,317] ERROR Producer failure (io.debezium.pipeline.ErrorHandler)
    connect  | java.lang.NullPointerException
    connect  |     at java.base/java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1011)
    connect  |     at java.base/java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006)
    connect  |     at java.base/java.util.Properties.put(Properties.java:1340)
    connect  |     at java.base/java.util.Properties.setProperty(Properties.java:228)
    connect  |     at io.debezium.config.Configuration$Builder.withDefault(Configuration.java:687)
    connect  |     at io.debezium.connector.mysql.signal.KafkaSignalThread.<init>(KafkaSignalThread.java:101)
    connect  |     at io.debezium.connector.mysql.MySqlReadOnlyIncrementalSnapshotChangeEventSource.<init>(MySqlReadOnlyIncrementalSnapshotChangeEventSource.java:91)
    connect  |     at io.debezium.connector.mysql.MySqlChangeEventSourceFactory.getIncrementalSnapshotChangeEventSource(MySqlChangeEventSourceFactory.java:91)
    ...

    and then:

    connect  | [2022-07-10 13:19:45,074] ERROR WorkerSourceTask{id=Wallets-Debezium-Mysql-UserTransactions-V2-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
    connect  | org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
    connect  |     at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50)
    connect  |     at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:116)
    connect  |     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    connect  |     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    connect  |     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    connect  |     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    connect  |     at java.base/java.lang.Thread.run(Thread.java:829)
    connect  | Caused by: java.lang.NullPointerException
    connect  |     at java.base/java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1011)
    connect  |     at java.base/java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006)
    connect  |     at java.base/java.util.Properties.put(Properties.java:1340)
    connect  |     at java.base/java.util.Properties.setProperty(Proper
    Naveenkumar seerangan
    @naveenseerangan:matrix.org
    [m]
    Hi,
    Yarin Benado
    @yarinb
    Is there any way to remove a table from an "in-progress" incremental snapshot? It's a huge table and we'd like to stop the snapshot process for this specific table.
    ajinkya-centime
    @ajinkya-centime
    Im trying to reRoute the Debezium message from all different topics to topic "cdc_event" with following config: {
    "name": "centime-debezium-connector",
    "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "{{db_endpoint}}",
    "database.port": "9909",
    "database.user": "{{user}}",
    "database.password": "{{pwd}}",
    "database.server.id": "7799838800",
    "database.server.name": "dev_db_local_test",
    "database.history.kafka.bootstrap.servers": "localhost:9092",
    "database.history.kafka.topic": "dbhistory",
    "table.include.list": "CENTIME_GL_DB.bill_payment_schedule_cdc,CENTIME_PAYMENT_DB.payments_cdc",
    "event.processing.failure.handling.mode": "fail",
    "include.schema.changes": "true",
    "time.precision.mode": "connect",
    "decimal.handling.mode": "double",
    "transforms": "reRoute",
    "transforms.reRoute.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.reRoute.regex": "^.(cdc|bill|payments).$",
    "transforms.reRoute.replacement": "cdc_events"
    }
    } , whenever I register debezium connector on kafka connect I get following error: [2022-07-26 12:21:22,403] INFO [centime-debezium-connector|task-0] EnrichedConnectorConfig values:
    config.action.reload = restart
    connector.class = io.debezium.connector.mysql.MySqlConnector
    errors.log.enable = false
    errors.log.include.messages = false
    errors.retry.delay.max.ms = 60000
    errors.retry.timeout = 0
    errors.tolerance = none
    header.converter = null
    key.converter = null
    name = centime-debezium-connector
    predicates = []
    tasks.max = 1
    topic.creation.groups = []
    transforms = [reRoute]
    transforms.reRoute.key.enforce.uniqueness = true
    transforms.reRoute.key.field.regex = null
    transforms.reRoute.key.field.replacement = null
    transforms.reRoute.negate = false
    transforms.reRoute.predicate =
    transforms.reRoute.topic.regex = null
    transforms.reRoute.topic.replacement = null
    transforms.reRoute.type = class io.debezium.transforms.ByLogicalTableRouter
    value.converter = null
    (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:376)
    [2022-07-26 12:21:22,404] ERROR [centime-debezium-connector|task-0] The 'topic.regex' value is invalid: A value is required (io.debezium.transforms.ByLogicalTableRouter:1881)
    [2022-07-26 12:21:22,404] ERROR [centime-debezium-connector|task-0] The 'topic.replacement' value is invalid: A value is required (io.debezium.transforms.ByLogicalTableRouter:1881)
    [2022-07-26 12:21:22,405] ERROR [centime-debezium-connector|task-0] Failed to start task centime-debezium-connector-0 (org.apache.kafka.connect.runtime.Worker:550)
    org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.connect.errors.ConnectException: Unable to validate config.
    at org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:295)
    at org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:593)
    at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:545)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1421)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$22(DistributedHerder.java:1434)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.kafka.connect.errors.ConnectException: Unable to validate config.
    at io.debezium.transforms.ByLogicalTableRouter.configure(ByLogicalTableRouter.java:180)
    at org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:284)
    ... 8 more
    From the logs, I see that even if Im passing topic regex pattern and replacement, it is showing as null while printing the config
    can anybody help ,what is wrong with config
    JFercan
    @JFercan
    Will a connector keep the database connection open even when both Connector and Tasks are paused? This is for a connection to MS SQL. Thank you.
    Suryaprakash S
    @suryaares
    I am getting this error while streaming data from debezium server to redis in two seperate ec2 instances.
    2022-08-03 11:27:47,711 ERROR [io.qua.run.Application] (main) Failed to start application (with profile prod): java.net.ConnectException: Connection refused (Connection refused)
    at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
    at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
    at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
    at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.base/java.net.Socket.connect(Socket.java:609)
    at redis.clients.jedis.DefaultJedisSocketFactory.createSocket(DefaultJedisSocketFactory.java:53)
    at redis.clients.jedis.Connection.connect(Connection.java:163)
    at redis.clients.jedis.BinaryClient.connect(BinaryClient.java:113)
    at redis.clients.jedis.Connection.sendCommand(Connection.java:119)
    at redis.clients.jedis.Connection.sendCommand(Connection.java:110)
    at redis.clients.jedis.BinaryClient.auth(BinaryClient.java:649)
    at redis.clients.jedis.BinaryJedis.auth(BinaryJedis.java:2416)
    at io.debezium.server.redis.RedisStreamChangeConsumer.connect(RedisStreamChangeConsumer.java:85)
    at io.debezium.server.redis.RedisStreamChangeConsumer_Bean.create(RedisStreamChangeConsumer_Bean.zig:704)
    at io.debezium.server.redis.RedisStreamChangeConsumer_Bean.create(RedisStreamChangeConsumer_Bean.zig:720)
    at io.debezium.server.DebeziumServer.start(DebeziumServer.java:115)
    at io.debezium.server.DebeziumServer_Bean.create(DebeziumServer_Bean.zig:256)
    at io.debezium.server.DebeziumServer_Bean.create(DebeziumServer_Bean.zig:272)
    at io.quarkus.arc.impl.AbstractSharedContext.createInstanceHandle(AbstractSharedContext.java:96)
    at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:29)
    at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:26)
    at io.quarkus.arc.impl.LazyValue.get(LazyValue.java:26)
    at io.quarkus.arc.impl.ComputingCache.computeIfAbsent(ComputingCache.java:69)
    at io.quarkus.arc.impl.AbstractSharedContext.get(AbstractSharedContext.java:26)
    at io.quarkus.arc.impl.ClientProxies.getApplicationScopedDelegate(ClientProxies.java:17)
    at io.debezium.server.DebeziumServer_ClientProxy.arc$delegate(DebeziumServer_ClientProxy.zig:67)
    at io.debezium.server.DebeziumServer_ClientProxy.arc_contextualInstance(DebeziumServer_ClientProxy.zig:82)
    at io.debezium.server.DebeziumServer_Observer_Synthetic_d70cd75bf32ab6598217b9a64a8473d65e248c05.notify(DebeziumServer_Observer_Synthetic_d70cd75bf32ab6598217b9a64a8473d65e248c05.zig:94)
    at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:300)
    at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:282)
    at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:70)
    at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:128)
    at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:97)
    at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy_0(LifecycleEventsBuildStep$startupEvent1144526294.zig:87)
    at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy(LifecycleEventsBuildStep$startupEvent1144526294.zig:40)
    at io.quarkus.runner.ApplicationImpl.doStart(ApplicationImpl.zig:623)
    at io.quarkus.runtime.Application.start(Application.java:101)
    at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:101)
    at io.quarkus.runtime.Quarkus.run(Quarkus.java:66)
    at io.quarkus.runtime.Quarkus.run(Quarkus.java:42)
    at io.quarkus.runtime.Quarkus.run(Quarkus.java:119)
    at io.debezium.server.Main.main(Main.java:15)
    Romil Punetha
    @romilpunetha

    I'm trying to use MongoDB outbox pattern with the payload as byte[]. I'm using the quarkus-avro library which converts my userEvent.avsc file into a UserEvent.java class that has an inbuild encoder which returns the byte[] for the class. I use a mapper to convert my original User.java instance into the UserEvent.java instance , and then i convert this into a byte[] and store it as a payload in the outbox table for Mongodb.

    When I use debezium with io.debezium.converters.ByteArrayConverter value converter I get an alphanumeric string in kafka. When my consumer tries to consume this as a byte[], it throws an error like unidentified hex value.
    I have other consumers that read from the kafka topic and have configs that accept avro schema and byte[] and create the java instance out of it. However, the byte sequence seems to differ after being consumed by mongodb as when I print the encoded byte[] vs the one that I read from kafka, they are different. Any idea what to do here? The idea is that the avro byte[] should be passed as-is to kafka, which can be read by consumers in a decode function.

    1 reply
    wangshuang0220
    @wangshuang0220
    hi all, did you met this error: Caused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'jdbc-sink7' is configured with 'delete.enabled=false' and 'pk.mode=none' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='t2',partition=0,offset=7,timestamp=1660188308616) with a null value and null value schema.
    wangshuang0220
    @wangshuang0220
    confluent sink jdbc connect can sync mysql function and producure?
    sharonmary
    @sharonmary:matrix.org
    [m]
    Good news
    Put an end to your financial problems,now you can start earning with your PC or mobile phone, PAY INSTANTLY!
    So far I’ve made over €5,500 in last 12days of doing this!(HOW) info inbox me the directly Telegram
    👇👇
    https://t.me/+klV1ANnp3q02YjU0
    kr1929uti
    @kr1929uti
    Hi everyone, I have a kafka setup with postgres as my source and sink. I am trying to implement a scenario where any ddl changes in postgres source connector (such as column addition, deletion, update on column, column type change) should reflect in my sink postgres table. I already have auto.evolve=true in my sink connector configuration (using JDBC sink connector), but it is not fulfilling the requirements. Any suggestion on this?
    kr1929uti
    @kr1929uti
    Hi all,
    Whenever I encounter schema changes (ddl changes coming in), I want to automate the table backup and table deletion and then table creation with the new schema. (I am using postgres db and I have kafka setup). Any suggestions on how to go about automation?
    rishimathur14
    @rishimathur14
    Hi Team we are facing below error frequent event though we have set retention.ms 31 year and clean.policy to delete but we get error ash-4.2$ curl -X GET -H "Accept:application/json" http://kafka-connect-service:8083/connectors/db2-connect/status
    {"name":"db2-connect-rdcdev-client-wsdiwwe","connector":{"state":"RUNNING","worker_id":"XXXX:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"XXXX:8083","trace":"io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.\n\tat io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:47)\n\tat io.debezium.connector.db2.Db2ConnectorTask.start(Db2ConnectorTask.java:88)\n\tat
    andersonwatts ✪
    @andersonwatts7:matrix.org
    [m]

    "I'll help 10individuals how to earn $30,000 in 72 hours from the crypto market. But you pay me 10% commission when you receive your profit. if interested send me a direct message on WhatsApp by asking me (HOW) for more details on how to get started

    ‪+1 (559) 666‑3967‬

    https://t.me/+JVp6bEZDk6o1NDA0

    salaei
    @salaei:matrix.org
    [m]
    Hi Everyone, We are using Debezium connector to replicate data from Postgres into Kafka, the connector fails because the connector tries to create a new pg slot with the same name while the slot exists and is inactive. The exact error is this
    "org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.\n\tat io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)\n\tat io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:172)\n\tat io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:41)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:172)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:139)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:108)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: io.debezium.DebeziumException: Failed to start replication stream at LSN{12D1/80663BD0}; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each.\n\tat io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startStreaming(PostgresReplicationConnection.java:309)\n\tat io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:129)\n\t... 9 more\nCaused by: org.postgresql.util.PSQLException: ERROR: replication slot \"revmgmt_debezium_1\" is active for PID 30294\n\tat org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2675)\n\tat org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1263)\n\tat org.postgresql.core.v3.QueryExecutorImpl.startCopy(QueryExecutorImpl.java:945)\n\tat org.postgresql.core.v3.replication.V3ReplicationProtocol.initializeReplication(V3ReplicationProtocol.java:60)\n\tat org.postgresql.core.v3.replication.V3ReplicationProtocol.startLogical(V3ReplicationProtocol.java:44)\n\tat org.postgresql.replication.fluent.ReplicationStreamBuilder$1.start(ReplicationStreamBuilder.java:38)\n\tat org.postgresql.replication.fluent.logical.LogicalStreamBuilder.start(LogicalStreamBuilder.java:41)\n\tat io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startPgReplicationStream(PostgresReplicationConnection.java:580)\n\tat io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationStream(PostgresReplicationConnection.java:414)\n\tat io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startStreaming(PostgresReplicationConnection.java:301)\n\t... 10 more\n"
    does anyone know why this is happening?
    salaei
    @salaei:matrix.org
    [m]
    @Naros
    salaei
    @salaei:matrix.org
    [m]
    @jpechane:
    Armand Eidi
    @arixooo:matrix.org
    [m]