Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
    Oskar Dudycz
    @oskardudycz
    let me try that :)
    Chris Cranford
    @Naros
    The replication slot being active is what threw me there, was so focused on that part of it.
    Oskar Dudycz
    @oskardudycz
    Now it works :D Chris, you have great eyes :P
    thank you :)
    Chris Cranford
    @Naros
    @oskardudycz np
    Oskar Dudycz
    @oskardudycz
    I might have some different questions coming up, I'm one of the maintainers of Marten library (http://jasperfx.github.io/marten/documentation/). It's a .NET library that allows to use Postgres as Document database and Event Store.
    I'm playing with PoC to use Debezium to allow our users to automatically transfer events from Marten through Debezium and Kafka Connect to Kafka.
    Chris Cranford
    @Naros
    That sounds awesome. If you get a workable PoC, we'd love to take a look at it.
    Oskar Dudycz
    @oskardudycz
    For the document tables - it already works
    I was even able to move it to the Elastic automatically through Kafka Conect
    Although there is an issue with JSON columns
    With the outbox pattern it allows to translate it into object
    while using regular approach it treats it as string
    Same with events, so that's also the reason why I wanted to check outbox pattern as it has more powerful options for SMT
    Still it seems to be strictly designed for events handling.
    Ad. events - e have a similar but a bit different event table structure, besides the naming of the columns - eg. we don't have aggregateType, we have that information in separate table. Do you know if providing all columns is required?
    Do you also know if for "regular" approach it's possible to use SMT translating JSONB column into object?
    Oskar Dudycz
    @oskardudycz
    I'd be willing to contribute (tho I'm not Scala specialist) or at least collaborate, as Debezium seems to be nice matching for Marten and especially our users ;)
    Chris Cranford
    @Naros
    @oskardudycz the fields themselves can be named whatever you wish, e.g. it doesn't have to be "aggregateType" but could be "routedby", you'd configure that with route.topic.by.field as an example.
    Oskar Dudycz
    @oskardudycz
    Yes, that I saw
    Chris Cranford
    @Naros
    But yes, I believe all those columns must exist in some fashion.
    Oskar Dudycz
    @oskardudycz
    I'm more concerned if all are required, so eg. if I don't have field aggregateType
    Ok, thank you
    Do you also know if for "regular" approach it's possible to use SMT translating JSONB column into object? <= do you know if that's currently possible?
    Chris Cranford
    @Naros
    Looking at the source, if routeByField isn't provided, you'll likely hit a NPE as the code does eventStruct.getString(routeByField).toLowerCase() to reroute the message to the desired topic.
    Oskar Dudycz
    @oskardudycz
    Understood
    I think that I can come up with some workaround for that
    we have concept of Tenancy and TenantId string column where user can put anything
    Chris Cranford
    @Naros
    @oskardudycz Yes, you should be able to do JSONB to Object in an SMT I believe. That may require some schema changes before you emit the message.
    Oskar Dudycz
    @oskardudycz
    Do you know if there are some SMT already existing?
    I tried to look on that but I wasn't able to find any
    Or at least if you could give me some hints where too look for that
    Chris Cranford
    @Naros
    I don't know of any specific to JSONB; however you can take a look at our transformations for inspiration, https://github.com/debezium/debezium/tree/master/debezium-core/src/main/java/io/debezium/transforms
    Oskar Dudycz
    @oskardudycz
    Ok, thank you, I'll try
    Thank you for your help - I appreciate that :+1:
    If I have more questions around that is gitter the right place to ask?
    Chris Cranford
    @Naros
    Here or https://groups.google.com/forum/#!forum/debezium are the best way to reach us.
    Oskar Dudycz
    @oskardudycz
    OK, thank you, I'll try not to bother you too much if it's not needed, although unfortunately as I said - I'm not a Java/Scala developer, so, unfortunately, I might have the newbie questions ;)
    Chris Cranford
    @Naros
    No worries, ask away and we'll try to respond asap.
    Oskar Dudycz
    @oskardudycz
    :+1:
    Andrew Garrett
    @garrett528
    hey i keep getting errors when connecting to Postgres RDS for a while about a SocketClosed error that results when the Source Connector tries to flush the LSN
    the producer then fails and i have to restart the whole process
    Andrew Garrett
    @garrett528
    i see that there was an old issue related to this problem but it didn't seem to be resolved https://issues.apache.org/jira/browse/KAFKA-5352 https://issues.jboss.org/browse/DBZ-248
    Jiri Pechanec
    @jpechane
    @garrett528 Hi, does it make sense for you to just write a SMT that would extract PK information from key and insert it into value? It would not require any change in DBZ code then and you can have it available immediately
    Jiri Pechanec
    @jpechane
    @garrett528 Could you please try to manually upgrade to the latest POstgres driver (PR is pending). If you'll manage to get it running on RDS and verify if the problem is mitigated it would help us a lot
    Oskar Dudycz
    @oskardudycz
    So as I wrote above, with the table with structure exactly the same as in the docs - it works and that's great ;)
    I tried to do the next step so - connect it now also to our marten tables. We have a structure:
    CREATE TABLE meetingsmanagementwrite.mt_events
    (
        seq_id bigint NOT NULL,
        id uuid NOT NULL,
        stream_id uuid,
        version integer NOT NULL,
        data jsonb NOT NULL,
        type character varying(500) COLLATE pg_catalog."default" NOT NULL,
        "timestamp" timestamp with time zone NOT NULL DEFAULT now(),
        tenant_id character varying COLLATE pg_catalog."default" DEFAULT '*DEFAULT*'::character varying,
        mt_dotnet_type character varying COLLATE pg_catalog."default",
        CONSTRAINT pk_mt_events PRIMARY KEY (seq_id)
    );
    I used the following source connector config:
    {
        "name": "postgres-source-connector-mt_events",
        "config": {
           "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
            "tasks.max": "1",
            "database.hostname": "postgres",
            "database.port": "5432",
            "database.user": "postgres",
            "database.password": "Password12!",
            "database.dbname": "postgres",
            "database.server.name": "dbserver1",
            "table.whitelist" : "meetingsmanagementwrite.mt_events",
            "tombstones.on.delete" : "false",
            "transforms": "outbox",
            "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter"
            "table.field.event.id": "id",
            "table.field.event.key": "stream_id",
            "table.field.event.type": "type",
            "table.field.event.payload": "data",
            "table.field.event.payload.id": "stream_id",
            "route.by.field": "tenant_id"
        }
    and I'm getting:
    2019-09-21 12:01:27,002 ERROR  ||  WorkerSourceTask{id=postgres-source-connector-mt_events-0} Task threw an uncaught and unrecoverable exception   [org.apache.kafka.connect.runtime.WorkerTask]
    org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:293)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:229)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.NullPointerException
        at io.debezium.transforms.outbox.EventRouter.apply(EventRouter.java:104)
        at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 11 more
    is there an instruction somewhere on how to debug debezium? ;)
    Oskar Dudycz
    @oskardudycz
    I added one record:
    INSERT INTO meetingsmanagementwrite.mt_events(
        seq_id, id, stream_id, version, data, type, "timestamp", tenant_id, mt_dotnet_type)
        VALUES (0, '1915d7de-3482-4d3a-b072-109e41e5cb0d', '9a0af90b-288b-454d-94e4-a44574748db6', 1, '{ "Name": "test", "Created": "2019-09-21T11:16:46.9169224Z", "MeetingId": "9a0af90b-288b-454d-94e4-a44574748db6"}', 'meeting_created', '2019-09-21 11:16:47.606273+00', '*DEFAULT*', 'Marten.WebApi.Meetings.Events.MeetingCreated, Marten.WebApi');