Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • 09:26
    jpechane converted_to_draft #2367
  • 09:01
    jpechane closed #2377
  • 06:55
    ani-sha opened #2377
  • 06:39
    jpechane opened #2376
  • 06:13
    jpechane opened #2375
  • 04:50
    jpechane closed #2372
  • 04:49
    jpechane closed #2374
  • 01:51
    roldanbob opened #2374
  • May 12 18:40
    Naros closed #2373
  • May 12 18:09
    Naros edited #2373
  • May 12 18:07
    Naros review_requested #2373
  • May 12 17:51
    shiawu opened #2373
  • May 12 17:19
    Naros opened #2372
  • May 12 16:01
    Naros synchronize #2371
  • May 12 14:22
    Naros opened #2371
  • May 12 13:49
    Naros opened #2370
  • May 12 13:23
    jpechane opened #2369
  • May 12 13:00
    jpechane opened #2368
  • May 12 12:15
    jpechane opened #2367
  • May 12 09:51
    jpechane opened #2366
René Kerner
@rk3rn3r
@gunnarmorling I think you were writing some comments while we were talking about debezium/debezium#2273 but I can't see any. Can you check if by accident you still have the tab open and submitted?
Gunnar Morling
@gunnarmorling
oh, you're right
just done
René Kerner
@rk3rn3r
:pray:
Thimxn
@thimxns_twitter

Hey guys, I'm currently setting up debezium (in kafka) for a mssql server (on another machine in the network). So far everything seemed to work fine however, when checking the status of the connector i get the following response:
{"name":"test-connector","connector":{"state":"RUNNING","worker_id":"localhost:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"localhost:8083","trace":"org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata\n"}],"type":"source"} while it used to be this in the first minute or so after starting it: {"name":"test-connector","connector":{"state":"RUNNING","worker_id":"localhost:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"localhost:8083"}],"type":"source"}.

The initial connector setup looks like the following:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "test-connector", "config": { "connector.class": "io.debezium. connector.sqlserver.SqlServerConnector", "database.hostname": "192.168.230.30",
"database.port": "1433", "database.user": "***","database.password": "***", "database.dbname": "aqotec_Daten", "database.server.name": "AQNEU", "table.whitelist": "dbo.RM360_L2R12", "database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "dbhistory.fulfillment"  } }';

Furthermore when starting the consumer using this:

sudo docker run -it --rm --name consumer --link zookeeper:zookeeper --link kafka2:kafka2 debezium/kafka:1.1 watch-topic -a AQNEU.aqotec_Daten.dbo.RM360_L2R12 --max-messages 1

I get no errors but it also doesnt track any of the changes:

WARNING: Using default BROKER_ID=1, which is valid only for non-clustered installations.
Using ZOOKEEPER_CONNECT=172.17.0.2:2181
Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.5:9092
Using KAFKA_BROKER=172.17.0.4:9092
Contents of topic AQOTECNEU.aqotec_Daten.dbo.RM360_L2R12:

If I would use the wrong watch-topic -a it would throw me an error so I guess its detecting the connector, but the connector is not working.

Since im trying to give you as much information as I can, here is my docker setup:

CONTAINER ID        IMAGE                    COMMAND                  CREATED             STATUS              PORTS
    NAMES
17ee16d7d123        debezium/kafka:1.1       "/docker-entrypoint.…"   15 minutes ago      Up 15 minutes       8778/tcp, 9092/tcp, 9779/tcp
    consumer
f41d9023971b        debezium/connect         "/docker-entrypoint.…"   8 days ago          Up 15 minutes       8778/tcp, 9092/tcp, 0.0.0.0:8083->8083/tcp, 9779/tcp
    connect
cb6da30727b3        debezium/kafka:1.1       "/docker-entrypoint.…"   8 days ago          Up 8 days           8778/tcp, 9779/tcp, 0.0.0.0:9092->9092/tcp
    kafka2
d027f3e800a7        debezium/zookeeper:1.4   "/docker-entrypoint.…"   8 days ago          Up 8 days           0.0.0.0:2181->2181/tcp, 0.0.0.0:2888->2888/tcp, 8778/tcp, 0.0.0.0:3888->3888/tcp, 9779/tcp   zookeeper

Im rather new to debezium and docker so it could be just some simple setup that I messed up.

I already tried to find solutions for this but couldn't find any that worked so I decided to ask here. Thanks already for your help.

Gunnar Morling
@gunnarmorling
@Naros do you think you have an answer for this user: https://twitter.com/karim_elna/status/1383081141465403407
perhaps an enhancement request falls out of it? I'm not sure
Chris Cranford
@Naros
@gunnarmorling It sounds to me like the SMT complains that it cannot find field aggregatetype since Hibernate creates it as "aggregate_type". Is that how you read it?
Chris Cranford
@Naros
We added a way to customize the naming at build time, I think it's something like quarkus.debezium-outbox.aggregate-type.name=<value>
That way the generated XML could match whatever naming strategy they needed in the db.
Gunnar Morling
@gunnarmorling
@Naros yes, exactly, that's how i read it
but this guy is using spring boot
so not our extension for quarkus
so probably just need to adjust the column names?
Chris Cranford
@Naros
Ah, well in that case wouldn't they just change the route.by.field in the connector's SMT configuration to use aggregate_typerather than the default aggregatetype?
Gunnar Morling
@gunnarmorling
ah, yes, right
that'd work too
Chris Cranford
@Naros
Pretty sure we made sure that all this stuff was super flexible re naming.
Gunnar Morling
@gunnarmorling
do you want to reply?
or should i
Chris Cranford
@Naros
Could you, I don't use my twitter too often and don't recall the pwd.
Gunnar Morling
@gunnarmorling
LOL, ok :)
Chris Cranford
@Naros
tbh, looking at the quarkus extension I was struggling to remember what I wrote lol
it feels like ages since I last looked at it lol
Gunnar Morling
@gunnarmorling
hehe, yeah, has been a while
Sanjeev Singh
@sanjeevhbti45_twitter

Can someone helps here i am facing below error in debezium MySQL connector while Deleting the rows.
Error : eventType=EXT_DELETE_ROWS

{"name":"connect-01","connector":{"state":"RUNNING","worker_id":"100.0.0.111:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"00.0.0.111:8083","trace":"org.apache.kafka.connect.errors.ConnectException: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1618682018000, eventType=EXT_DELETE_ROWS, serverId=520243567, headerLength=19, dataLength=8137, nextPosition=3791091837, flags=0}\n\tat io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)\n\tat io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)\n\tat io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: java.lang.RuntimeException: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1618682018000, eventType=EXT_DELETE_ROWS, serverId=520243567, headerLength=19, dataLength=8137, nextPosition=3791091837, flags=0}\n\tat io.debezium.connector.mysql.BinlogReader.handleServerIncident(BinlogReader.java:668)\n\tat io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583)\n\t... 5 more\nCaused by: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1618682018000, eventType=EXT_DELETE_ROWS, serverId=520243567, headerLength=19, dataLength=8137, nextPosition=3791091837, flags=0}\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:300)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:223)\n\tat io.debezium.connector.mysql.BinlogReader$1.nextEvent(BinlogReader.java:249)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:957)\n\t... 3 more\nCaused by: java.io.EOFException\n\tat com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:190)\n\tat java.base/java.io.InputStream.read(InputStream.java:271)\n\tat java.base/java.io.InputStream.skip(InputStream.java:531)\n\tat com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.skipToTheEndOfTheBlock(ByteArrayInputStream.java:216)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:296)\n\t... 6 more\n"}],"type":"source"}

Gunnar Morling
@gunnarmorling
hey @jpechane, good morning
got something for you (maybe) :)
the other day, i wanted to understand how offsets are handled for source connectors with more than one task
so to see what will change for the sql server work
in order to do so, i created a super-simple source connector for etcd: https://github.com/gunnarmorling/kcetcd
(etcd has an easy-to-use "watch" feature which allows to implement CDC style functionality)
i consider this a learning vehicle for me, us, and others; e.g. we can use this for exploring that new exactly-once support in KC
but, i also thought you might be using it for a first PoC implementation of the new snapshotting
it's completely independent of debezium, a plain multi-task source
with proper resumeability
curious what you think :)
René Kerner
@rk3rn3r
Does that mean changes coming to the Mongo DB connector soon?
Jiri Pechanec
@jpechane
@rk3rn3r Hi, no, it does not
René Kerner
@rk3rn3r
thx for the detailed explanation :laughing:
livolleyball
@livolleyball

@rk3rn3r Hi, no, it does not

thx for your explanation

Gunnar Morling
@gunnarmorling
@rk3rn3r what makes you think of changes to mongodb?
7 replies
Sanjeev Singh
@sanjeevhbti45_twitter

Can someone helps here i am facing below error in debezium MySQL connector while Deleting the rows.
Error : eventType=EXT_DELETE_ROWS

{"name":"connect-01","connector":{"state":"RUNNING","worker_id":"100.0.0.111:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"00.0.0.111:8083","trace":"org.apache.kafka.connect.errors.ConnectException: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1618682018000, eventType=EXT_DELETE_ROWS, serverId=520243567, headerLength=19, dataLength=8137, nextPosition=3791091837, flags=0}\n\tat io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)\n\tat io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)\n\tat io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: java.lang.RuntimeException: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1618682018000, eventType=EXT_DELETE_ROWS, serverId=520243567, headerLength=19, dataLength=8137, nextPosition=3791091837, flags=0}\n\tat io.debezium.connector.mysql.BinlogReader.handleServerIncident(BinlogReader.java:668)\n\tat io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583)\n\t... 5 more\nCaused by: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1618682018000, eventType=EXT_DELETE_ROWS, serverId=520243567, headerLength=19, dataLength=8137, nextPosition=3791091837, flags=0}\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:300)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:223)\n\tat io.debezium.connector.mysql.BinlogReader$1.nextEvent(BinlogReader.java:249)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:957)\n\t... 3 more\nCaused by: java.io.EOFException\n\tat com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:190)\n\tat java.base/java.io.InputStream.read(InputStream.java:271)\n\tat java.base/java.io.InputStream.skip(InputStream.java:531)\n\tat com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.skipToTheEndOfTheBlock(ByteArrayInputStream.java:216)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:296)\n\t... 6 more\n"}],"type":"source"}

Does anyone face this error before ?
Please share the solution

Gunnar Morling
@gunnarmorling
@sanjeevhbti45_twitter is there more in your logs? can you send a mail to the debezium mailing list with more details please? thx
Sanjeev Singh
@sanjeevhbti45_twitter

@gunnarmorling mysql connector's showing only these logs which i shared here. And i think this error is due to heavy delete command (~3 Cr rows deleted) i did on MySQL side. Since then i am facing this error again and again.

Can You Please help me debezium mailing list ?
Also please suggest me what other details i can share with you ? As these are the only logs i am getting from MySQL connectors.

Chris Cranford
@Naros

@jpechane I have what I hope is a quick question. So I've been working on the default value converter for Oracle and I've mirrored it primarily based on SQL Server's implementation.
The one thing I'm noticing specifically for FLOAT data types is the following error when I'm generating the initial schema representation for the table during snasphot:

Caused by: org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
    at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
    at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:374)
    at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:119)
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
    at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
    at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
    at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
    at io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:117)
    at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:130)
    at io.debezium.connector.oracle.OracleDatabaseSchema.applySchemaChange(OracleDatabaseSchema.java:70)
    at io.debezium.pipeline.EventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(EventDispatcher.java:460)
    at io.debezium.relational.RelationalSnapshotChangeEventSource.lambda$createSchemaChangeEventsForTables$2(RelationalSnapshotChangeEventSource.java:273)
    ... 10 more
Caused by: org.apache.kafka.connect.errors.DataException: Struct schemas do not match.
    at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:252)
    at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
    at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
    ... 26 more

When I look at the failure point, it's in the ConnectSchema#equals method, specifically this line:

if (o == null || getClass() != o.getClass()) return false;

The passed in object is a SchemaBuilder and this is all invoked when calling fieldBuilder.defaultValue inside TableSchemaBuilder#addField.
The field is represented as a VariableScaleDecimal.builder(), so perhaps there is something I'm missing here?

25 replies
Sergei Morozov
@morozov

Hi, a colleague of mine identified and fixed a performance bottleneck in mysql-binlog-reader (osheroff/mysql-binlog-connector-java#34) which the MySQL Debezium connector uses internally.

It's been 3 weeks now but there's no feedback on the PR. I'm curious if this community has some other communication channels with the maintainer to get it reviewed. Any additional feedback is of course welcome.

12 replies
andrehsu13
@andrehsu13
hello Debezium team, I have a question about implementing customer converters: https://debezium.io/documentation/reference/development/converters.html. I need to implement a field converter that converts any required field in postgres to optional. I'm looking at the interface for implementing custom converter and we are passed a RelationalColumn and we're expected to return a target kafka connect schema (i.e. registration.register(SchemaBuilder.string(), x -> x.toString()). The problem is I just need to access the column's current SchemaBuilder and call its option() method to make it optional. Otherwise, I would need to convert from column's jdbcType to kafka connect's SchemaBuilder. As far as I can tell, there's no easy way to do that. The closest is this: https://github.com/debezium/debezium/blob/e37abe5f92240c9f5f5a5c01cac9157dc8552bb1/debezium-core/src/main/java/io/debezium/jdbc/JdbcValueConverters.java#L150. Even this implementation requires a Column type which is different from the spi RelationalColumn interface. Do you guys have any recommendation on how I can get the correct SchemaBuilder and just call optional on it to convert the required field to optional? Thanks.
11 replies
Sanjeev Singh
@sanjeevhbti45_twitter

@gunnarmorling mysql connector's showing only these logs which i shared here. And i think this error is due to heavy delete command (~3 Cr rows deleted) i did on MySQL side. Since then i am facing this error again and again.

Can You Please help me debezium mailing list ?
Also please suggest me what other details i can share with you ? As these are the only logs i am getting from MySQL connectors.

@gunnarmorling can we get any idea on this error

2 replies
Gunnar Morling
@gunnarmorling
hey @jpechane, any thoughts on that etcd connector and the idea of using it to PoC the new snapshotting?