Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
    Jiamin WANG
    @Jiamin-WANG
    hi do you have any idea to clean the error message in tasks->trace? I ve tried to restart the task and the connect and clean the log but the error are still there
    8 replies
    Arne Huang
    @ArneHuang_twitter

    Using debezium engine, is there way to access engine internals from the processor, i.e. in handleBatch()?

    Use case: Instead of sending the DDL statement as string when schema changes I want to send the Schema object (maybe as json) since it uses debezium parser already, so I don't need to reparse the statement on the other side

    3 replies
    payasgupta267
    @payasgupta267
    Hi, Is there any relevant document on debezium parallelism model ?. I want to know more about ways/configurations to increase connector throughput. Can someone please help me on this ?
    1 reply
    saurabh8774
    @saurabh8774

    Hello, I am using S3 sink connector and want to have a custom bucket name
    topic-name : mlp-emp
    bucketname: emp

    "transforms":"dropPrefix",      
    "transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",  
    "transforms.dropPrefix.regex":"mlp-(.*)",  
    "transforms.dropPrefix.replacement":"$1"

    Tried using regex but it is failing with Nullpointerexception. Any ideas ?

    2 replies
    Balaji Mohandas
    @BalajiMohandas

    Hi There, I am trying to setup a Postgres Sink based based on a debezium/connect image. I am following Jerri's [article] (https://debezium.io/blog/2017/09/25/streaming-to-another-database/ ). I did bought up my docker containers using docker-compose. and I did copied kafka jdbc connector to connect container
    '''
    docker cp kafka-connect-jdbc-5.4.1.jar tutorial_connect_1:/kafka/connect/kafka-connect-jdbc
    '''
    and I did tried to setup the sink connector using postman
    '''
    POST: localhost:8083/connectors
    {
    "name": "jdbc-postgres-sink",
    "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "customers",
    "connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgres&password=postgres",
    "auto.create": "true",
    "insert.mode": "upsert",
    "pk.fields": "id",
    "pk.mode": "record_value"
    }
    }
    '''
    I am getting an error as "Failed to find any class that implements Connector and which name matches io.confluent.connect.jdbc.JdbcSinkConnector, available connectors are: PluginDesc{.."

    can you please advice, if I have missed any configuration information or where should I look for in logs if this jar is accepted by the connector or failed to load ?

    2 replies
    hanoisteve
    @hanoisteve
    Print full null pointer exception. Line in code etc
    hanoisteve
    @hanoisteve
    Can someone tell me if the outbox router is supposed to create a topic based on the content stored in outbox table? How many topics does using outbox router create?
    Right now I am just seeing outbox_event topic generated. But I get topics for all my tables.
    I need to confirm my install of outbox router asap
    hanoisteve
    @hanoisteve
    So right now I am streaming ALL of my tables including the outbox table.
    But right now I am not seeing any additional topics generated when I store a value in the outbox table.
    So is there a log that I should be seeing from the outbox router?
    Does the outbox router have an option to turn on logging and show what it does with values stored in the outbox table?
    If there was an error processing a column etc where would I see this?
    Jiri Pechanec
    @jpechane
    @hanoisteve This issue drags for too long. Would you please be able to prepare a docker-compose file with your setup and configuration files you are using so we can try to reproduce the problem on our side?
    hanoisteve
    @hanoisteve
    Do you log what outbox transform is doing? Just a single line to show it found something and if there are errors during processing?
    17 replies
    I am just looking for hints if it does not produce a topic what might be wrong. And approach to troubleshoot this type of issue.
    Kushal Reddy
    @kushalreddy_gitlab
    @jpechane I deleted failed connector & db.history topic and registered new connector with same name & db.history topic . We are facing below error . Can you pls help me on this? {
    "name": "CONNECTOR-DOB-GP1",
    "connector": {
    "state": "RUNNING",
    "worker_id": "localhost:8083"
    },
    "tasks": [
    {
    "state": "FAILED",
    "trace": "org.apache.kafka.connect.errors.ConnectException: The db history topic is missing. You may attempt to recover it by reconfiguring the connector to SCHEMA_ONLY_RECOVERY\n\tat io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:103)\n\tat io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:101)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n",
    "id": 0,
    "worker_id": "localhost:8083"
    }
    ],
    "type": "source"
    }
    37 replies
    Kushal Reddy
    @kushalreddy_gitlab
    @jpechane
    Jiri Pechanec
    @jpechane
    @kushalreddy_gitlab That's only subset
    Kushal Reddy
    @kushalreddy_gitlab
    @jpechane
    Raj
    @rajma996_gitlab
    I have a question in debezium documentation. At https://debezium.io/documentation/faq/#how_to_change_the_offsets_of_the_source_database, it says "To move back to a previous offset the connector should be stopped and the following command has to be issued:"
    By stopping the connector, it means pause the connector ? or completely remove the connecter from Kafka connect ?
    Jiri Pechanec
    @jpechane
    @rajma996_gitlab Hi, it is better to remove
    7 replies
    Kushal Reddy
    @kushalreddy_gitlab
    @jpechane
    tejashjl
    @tejashjl
    Hi,
    i have been using debezium (MySQL connector) for production for nearly a year. Recently we tried to add triggers in MySQL, this is causing the connectors to fail. I tried to take the latest releases, but nothing worked.
    17 replies
    any suggestions on this?
    Mike Kamornikov
    @mikekamornikov
    @jpechane Is there any real need to specify partition while manipulating offsets (looking through FAQ)? We have a unique key and debezium doesn't specify partitioner class (so it should be default partitioner).
    1 reply
    chrisdrew1
    @chrisdrew1
    Debezium and Oracle... Is it supposed to be able to replicate CDC of CLOB columns? If not, is the column blacklist functionality working?
    Jeremy Finzel
    @jfinzel
    Has anyone seen this before where a Kafka Connect task get unassigned, and never re-assigned? This is the error I got in my logs. I ended up having to restart the containers to get it to assign again.
    May 30 16:55:17 debezium-postgres-node01 docker-compose[96950]: #033[36mconnect_1  |#033[0m 2020-05-30 21:55:17,016 INFO   ||  [Worker clientId=connect-1, groupId=debezium-postgres-staging] Broker coordinator was unreachable for 3000ms. Revoking previous assignment Assignment{error=0, leader='connect-1-d66568f8-e2e9-4e70-be4c-758cce9db37f', leaderUrl='http://2.1.0.3:8083/', offset=1597, connectorIds=[foo,bar], taskIds=[foo-0, bar-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} to avoid running tasks while not being a member the group   [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
    1 reply
    antsmartian
    @antsmartian

    Hello I tried to set up debezium and connect to kafka using Java (fully dockerized). But for some reason, I couldn't able to get the java code connect to the kafka. I tried asking the question on SO. I'm just reposting here:

    My docker compose looks like:

    version: '3.1'
    services:
        postgres:
            image: debezium/postgres
            environment:
              POSTGRES_PASSWORD: qwerty
              POSTGRES_USER: appuser
            volumes:
               - ./postgres:/data/postgres
            ports:
              - 6532:6532
        zookeeper:
            image: confluentinc/cp-zookeeper
            ports:
              - "2181:2181"
            environment:
              ZOOKEEPER_CLIENT_PORT: 2181
        kafka:
            image: confluentinc/cp-kafka
            depends_on:
              - zookeeper
              - postgres
            ports:
              - "9092:9092"
            environment:
              KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
              KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
              KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
              KAFKA_LOG_CLEANER_DELETE_RETENTION_MS: 5000
              KAFKA_BROKER_ID: 1
              KAFKA_MIN_INSYNC_REPLICAS: 1
              KAFKA_ADVERTISED_HOST_NAME: kafka
        connector:
            image: debezium/connect:latest
            ports:
              - "8083:8083"
            environment:
              GROUP_ID: 1
              CONFIG_STORAGE_TOPIC: my_connect_configs
              OFFSET_STORAGE_TOPIC: my_connect_offsets
              BOOTSTRAP_SERVERS: kafka:9092
            depends_on:
              - zookeeper
              - postgres
              - kafka
        app-server:
        # Configuration for building the docker image for the backend service
            build:
              context: . # Use an image built from the specified dockerfile in the `polling-app-server` directory.
              dockerfile: Dockerfile
            ports:
              - "8080:8080" # Forward the exposed port 8080 on the container to port 8080 on the host machine
            restart: always
            depends_on: 
              - kafka 
              - zookeeper
              - postgres
              - connector
            environment:
              BOOTSTRAP_SERVERS: kafka:9092

    But in my java code if do:

    System.getenv("bootstrap.servers")
    // or
    System.getenv("BOOTSTRAP_SERVERS")

    Still I get null. Here is my docker file for spring app:

    FROM maven:3-jdk-11 as builder
    # create app folder for sources
    RUN mkdir -p /build
    WORKDIR /build
    COPY pom.xml /build
    #Download all required dependencies into one layer
    RUN mvn -B dependency:resolve dependency:resolve-plugins
    #Copy source code
    COPY src /build/src
    # Build application
    RUN mvn package
    
    FROM openjdk:8-jdk-alpine
    RUN addgroup -S spring && adduser -S spring -G spring
    USER spring:spring
    ARG JAR_FILE=target/*.jar
    COPY ${JAR_FILE} app.jar
    ENTRYPOINT ["java","-jar","/app.jar"]

    Any hints, what mistake I'm doing here?

    3 replies
    Eric Weaver
    @eric-weaver
    hello, we're experiencing some issues with debezium that i haven't seen before. Running a fairly old version 0.7.5 atm. we have 3 node kafka cluster. Cluster went through a clean leader election and then our debezium connectors have since been been failing with the final exception
    Caused by: java.lang.IllegalStateException: The database history couldn't be recovered. Consider to increase the value for database.history.kafka.recovery.poll.interval.ms
    Attempted to increase database.history.kafka.recovery.poll.interval.ms to 1000ms but doesn't seem to help
    Any insights?
    Ashhar Hasan
    @hashhar
    @eric-weaver If the number of records in your database history are large then you might need to set it to an obscenely large value. I faced this once too and got away with 10000ms.
    Eric Weaver
    @eric-weaver
    history topic only appears to be like 10mb
    let me try increasing to 10000ms
    Eric Weaver
    @eric-weaver
    hmm @hashhar I attempted increasing the database.history.kafka.recovery.poll.interval.ms and database.history.kafka.recovery.attempts to much higher values but no luck
    any other ideas?
    Eric Weaver
    @eric-weaver
    So I was finally able to get passed this issue by doing the following
    1. Stop connector
    2. Delete history topic
    3. Re-submit the connector with snapshot.mode=schema_only_recovery (Connector recreated the history topic)
    4. Re-submit the connector with snapshot.mode=when_needed
      At this point I was able to get it back up and running again. Successfully picked up where it left off
    Cameron Seebach
    @cseebach-tpc

    Hey folks! I'm planning to take the data from Debezium and put it into an Elasticsearch cluster - during the initial snapshot, I need to increase the resources allocated to the ES cluster, and then size back down for ongoing CDC. Does Debezium provide events that I could use to figure out when the initial snapshot is finished?

    If not, I'm planning to do some kind of watching of the Kafka topics to check when the rate of added events slows down enough. That would work, but it isn't as clean as I would like.

    2 replies
    Praveen Kumar K S
    @prawen
    How can I use debezium connector pointing to pgpool ? Is it supported ?
    3 replies
    Kushal Reddy
    @kushalreddy_gitlab
    @jpechane what is default retention period of dbhistory topic ?
    26 replies
    Rotem Fogel
    @rotemfo_twitter
    General question, can Debezium run in Standalone mode without relying on Kafka Connect?
    2 replies
    boriwat-cto
    @boriwat-cto
    Hi, I have problem on table.whitelist. I have created connector to monitor 1 table and then added 2 more tables by updating table.whitelist. Those tables that i added is not consume to kafka topic. How should I do? p.s. Debezium==1.1.1final
    MaheshwariGaurav
    @MaheshwariGaurav

    hello Everyone,
    I am using Debezium connector for Postgres Aurora on AWS. I am able to test CDC using the dummy table but when I checked with the actual table, it keeps on giving me info "flushing 0 outstanding messages for offset commit" and I don't see any data in Kafka topic.
    However, the table is getting loaded in real-time and I see could see data in it.

    Below is stack trace, I don't see any WARN/Error ..instead it is just printing INFO about the connector.

    [2020-06-02 10:44:14,489] INFO Creating thread debezium-postgresconnector-debezium-change-event-source-coordinator (io.debezium.util.Threads)
    [2020-06-02 10:44:14,489] INFO WorkerSourceTask{id=debezium-srt-customer-connector-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask)
    [2020-06-02 10:44:14,491] INFO Metrics registered (io.debezium.pipeline.ChangeEventSourceCoordinator)
    [2020-06-02 10:44:14,492] INFO Context created (io.debezium.pipeline.ChangeEventSourceCoordinator)
    [2020-06-02 10:44:14,494] INFO Previous snapshot has completed successfully, streaming logical changes from last known position (io.debezium.connector.postgresql.snapshot.InitialSnapshotter)
    [2020-06-02 10:44:14,494] INFO According to the connector configuration no snapshot will be executed (io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource)
    [2020-06-02 10:44:14,495] INFO Snapshot ended with SnapshotResult [status=SKIPPED, offset=PostgresOffsetContext [sourceInfo=source_info[server='debezium'db='srt', lsn=0/A50C6B8, timestamp=2020-06-02T10:42:01.761884Z, snapshot=FALSE], partition={server=debezium}, lastSnapshotRecord=false]] (io.debezium.pipeline.ChangeEventSourceCoordinator)
    [2020-06-02 10:44:14,497] INFO Connected metrics set to 'true' (io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics)
    [2020-06-02 10:44:14,497] INFO Starting streaming (io.debezium.pipeline.ChangeEventSourceCoordinator)
    [2020-06-02 10:44:14,497] INFO Initializing PgOutput logical decoder publication (io.debezium.connector.postgresql.connection.PostgresReplicationConnection)
    [2020-06-02 10:44:14,601] INFO Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=173065912, catalogXmin=5902670] (io.debezium.connector.postgresql.connection.PostgresConnection)
    [2020-06-02 10:44:14,966] INFO REPLICA IDENTITY for 'srt.customer' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns (io.debezium.connector.postgresql.PostgresSchema)
    [2020-06-02 10:44:23,351] INFO WorkerSourceTask{id=debezium-customer-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
    [2020-06-02 10:44:23,352] INFO WorkerSourceTask{id=debezium-customer-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
    [2020-06-02 10:44:23,389] INFO WorkerSourceTask{id=debezium-customer-connector-0} Finished commitOffsets successfully in 38 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)
    [2020-06-02 10:44:33,389] INFO WorkerSourceTask{id=debezium-customer-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
    [2020-06-02 10:44:33,390] INFO WorkerSourceTask{id=debezium-customer-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtim
    e.WorkerSourceTask)

    Can someone please help me on this.

    MaheshwariGaurav
    @MaheshwariGaurav
    This is the connector coonfig i am using .
    {
        "name": "debezium-srt-connector",
        "config": {
            "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
            "tasks.max": "1",
            "database.hostname": "XXXXXXXXXXXXXX",
            "database.port": "5432",
            "database.user": "XXXXX",
            "database.password": "XXXXXX",
            "database.dbname" : "srt",
            "plugin.name":"pgoutput",
            "database.server.name": "debezium",
            "slot.name":"debezium",
            "table.whitelist": "srt.customer" ,
            "transforms": "unwrap",
            "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
            "transforms.unwrap.drop.tombstones": "false",
            "transforms.unwrap.operation.header": "true",
            "producer.override.security.protocol": "SSL",
            "heartbeat.interval.ms":"3000"
    
    
           }
    }
    MaheshwariGaurav
    @MaheshwariGaurav
    part 3: I had first got the issue
    [2020-06-02 09:55:01,232] WARN Received 10001 events which were all filtered out, so no offset could be committed. This prevents the replication slot from acknowledging the processed WAL offsets, causing a growing backlog of non-removeable WAL segments on the database server. Consider to either adjust your filter configuration or enable heartbeat events (via the heartbeat.interval.ms option) to avoid this situation. (io.debezium.connector.postgresql.PostgresStreamingChangeEventSource)
    I then set heartbeat.interval.ms=3000 and since then I am not getting any error /warning instead Connector is just printing
    flushing 0 outstanding messages for offset commit . please help . I am stuck.
    MaheshwariGaurav
    @MaheshwariGaurav
    @jpechane any suggestion to resolve this issue?
    Jonathan N Baradza
    @JonathanNgoni
    Good-day good people, i would like to know if anyone can help me, i want to use debezium to capture data changes in my postgres 11 docker instance, i have already tables and data coming into postgres, how can i use the native postgres pgoutput logical replication with debezium . or how should i install/externd my postgres container with decoderbufs/wal2json plugins. thank you in advance
    Jiri Pechanec
    @jpechane