Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • 01:11
    Naros synchronize #3134
  • 01:06
    Naros synchronize #3134
  • Jan 20 22:54
    roldanbob edited #3144
  • Jan 20 22:53
    roldanbob opened #3144
  • Jan 20 22:00
    Naros synchronize #3134
  • Jan 20 21:28
    Naros synchronize #3134
  • Jan 20 21:17
    Naros synchronize #3134
  • Jan 20 21:15
    Naros synchronize #3134
  • Jan 20 20:51
    Naros synchronize #3134
  • Jan 20 20:48
    Naros synchronize #3134
  • Jan 20 20:44
    Naros synchronize #3134
  • Jan 20 20:40
    Naros synchronize #3134
  • Jan 20 18:23
    Naros synchronize #3134
  • Jan 20 18:21
    Naros synchronize #3134
  • Jan 20 18:17
    jpechane opened #3143
  • Jan 20 18:13
    gunnarmorling closed #3142
  • Jan 20 13:22
    gunnarmorling synchronize #3128
  • Jan 20 09:42
    jpechane opened #3142
  • Jan 20 07:24
    gunnarmorling closed #3132
  • Jan 20 03:37
    roldanbob edited #3140
Jagannath TImma
@jagan-autonomic
the replication_slots only has the first connectors slot
Chris Cranford
@Naros
@mhaagens No worries, ping us if you still run into issues.
Jagannath TImma
@jagan-autonomic
and both are wal2json
Chris Cranford
@Naros
@jagan-autonomic Oh, thats odd. Can you please paste here both configs real fast?
You really should have 2 replication slots defined in that PG table if they're being configured differently or else we have some regression.
Martin
@mhaagens
@Naros Works perfectly now. Thanks so much for helping me out!
Chris Cranford
@Naros
@mhaagens Awesome, we welcome any feedback you have on pgoutput, good & bad.
We're super excited about it and certainly like seeing the adoption of it
Jagannath TImma
@jagan-autonomic
{
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "snapshot.mode": "never",
      "database.dbname": "assets",
      "database.port": "5432",
      "slot.name": "debezium-devicebindings",
      "name": "device-bindings-debezium-connector",
      "database.server.name": "assets",
      "plugin.name": "wal2json_rds_streaming",
      "heartbeat.interval.ms":"5000",
      "table.whitelist": "public.bindings",
      "rows.fetch.size":"1000",
      "max.queue.size":"2000",
      "max.batch.size":"1000",
      "tombstones.on.delete":"false",
      "database.tcpKeepAlive": "true",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter": "com.autonomic.connect.converters.BindingChangeEventsConverter",
      "transforms":"Bindings,ValueToKey,DBZExtractField",

      "transforms.Bindings.type":"org.apache.kafka.connect.transforms.RegexRouter",
      "transforms.Bindings.regex":"assets.public.bindings",
      "transforms.Bindings.replacement":"device-binding-change-events-dbz-trial",

      "transforms.ValueToKey.type": "com.autonomic.connect.transformers.AssetsValueToKey",
      "transforms.ValueToKey.fields":"before.subject_id",

      "transforms.DBZExtractField.type": "com.autonomic.connect.transformers.DBZExtractField$Key",
      "transforms.DBZExtractField.field":"subject_id"
    }

   {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "snapshot.mode": "initial",
      "database.dbname": "assets",
      "database.port": "5432",
      "slot.name": "debeziumbindings",
      "name": "bindings-debezium-connector",
      "database.server.name": "assets",
      "plugin.name": "wal2json_rds_streaming",
      "heartbeat.interval.ms":"5000",
      "table.whitelist": "public.bindings",
      "rows.fetch.size":"1000",
      "max.queue.size":"2000",
      "max.batch.size":"1000",
      "tombstones.on.delete":"false",
      "database.tcpKeepAlive": "true",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter": "com.autonomic.connect.converters.BindingChangeEventsConverter",
      "transforms":"Bindings,ValueToKey,DBZExtractField",

      "transforms.Bindings.type":"org.apache.kafka.connect.transforms.RegexRouter",
      "transforms.Bindings.regex":"assets.public.bindings",
      "transforms.Bindings.replacement":"binding-change-events-trial",

      "transforms.ValueToKey.type": "com.autonomic.connect.transformers.AssetsValueToKey",
      "transforms.ValueToKey.fields":"before.object_id",

      "transforms.DBZExtractField.type": "com.autonomic.connect.transformers.DBZExtractField$Key",
      "transforms.DBZExtractField.field":"object_id"
    }
@Naros the second config is for the first running connector. (I pasted out of order)
Chris Cranford
@Naros
@jagan-autonomic I honestly don't see anything that appears incorrect, I would definitely expect to see 2 entries in pg_replication_slots, one for debeziumbinings and one for debezium-devicebindings.
Jagannath TImma
@jagan-autonomic
ya
Chris Cranford
@Naros
And you don't see any warnings or cause for concern in the logs when you start the second connector for the device-bindings?
Jagannath TImma
@jagan-autonomic
nope.. i checked the logs but didnt find any errors/warnings
Chris Cranford
@Naros
Now I learned recently that Kafka will pull a connector's configuration from the config topic if you recreate the connector over and over with the same name.
So if you say had slot.name=X then destroyed the connector and re-created it with slot.name=Y, Connect first starts the connector with the old-config, stops it, applies the new config, then restarts.
But the end result should still be multiple slots.
Jagannath TImma
@jagan-autonomic
ya i am aware of that
funny thing is i still see this Creating task device-bindings-debezium-connector-0 when debezium restarts
Chris Cranford
@Naros
@jagan-autonomic Sorry for the delay, I was setting up an environment to just test with the latest 0.10 images.
postgres=# select * from pg_replication_slots;
 slot_name |  plugin  | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn 
-----------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------
 c1        | pgoutput | logical   |  12994 | postgres | f         | t      |        113 |      |          557 | 0/17587E0   | 0/1758818
 c2        | pgoutput | logical   |  12994 | postgres | f         | t      |        110 |      |          559 | 0/1758AC0   | 0/1758AF8
These 2 connectors are just very basic connector configs too.
Jagannath TImma
@jagan-autonomic
i actually might know what the problem is
but lemme verify before i say
Jagannath TImma
@jagan-autonomic
Ya.. such a silly thing
soo PG apparently doesnt support - in the slot name
but when i created the connector, i swear I did not see any errors
but may be I am blind
Chris Cranford
@Naros
@jagan-autonomic Awesome, I wasn't aware of the naming restriction either.
Was there any indication of that in the logs?
Jagannath TImma
@jagan-autonomic
as far as I know there wasnt
Chris Cranford
@Naros
I saw this in my local testing of it:
org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: ERROR: syntax error
    at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:127)
    at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:49)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:199)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.postgresql.util.PSQLException: ERROR: syntax error
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2497)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2233)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:310)
    at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:446)
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:370)
    at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:311)
    at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:297)
    at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:274)
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:269)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:292)
    at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:125)
    ... 9 more
Jagannath TImma
@jagan-autonomic
Ya its possible i missed it in the logs
i am running this on kubernetes and its also possible the pod was restarted and i lost the logs
Chris Cranford
@Naros
But this does raise the question whether we should detect this problem earlier.
Jagannath TImma
@jagan-autonomic
that is true
Chris Cranford
@Naros
postgres=# select pg_create_logical_replication_slot('c-2','pgoutput');
ERROR:  replication slot name "c-2" contains invalid character
HINT:  Replication slot names may only contain lower case letters, numbers, and the underscore character.
@gunnarmorling Is this something you think we should check for during validation of the configuration?
Personally I think it would be more helpful to quickly report the connector cannot be created due to bad configuration up-front rather than relying on detecting this failure in the logs.
@jagan-autonomic Glad its working now then, I'm going to head out for the day. Catch you all next week.
Gunnar Morling
@gunnarmorling
@Naros yes, definitely
Cyril Scetbon
@cscetbon
Can you confirm that changing a configuration parameter means having to restart the EmbeddedEngine ?
Chris Cranford
@Naros
@cscetbon Yes you will. The original Configuration object passed to the embedded engine during construction is converted to a HashMap<> instance later on and provided to the underlying connector.
So any changes you make to the original Configuration object won't propagate to the connector.
Cyril Scetbon
@cscetbon
gotcha. Thanks @Naros
Mincong Huang
@mincong-h
Hi Debezium committers, how are you? Your project looks very interesting and I want to participate. I saw some "easy-starter" tickets and would like to try this one: https://issues.jboss.org/browse/DBZ-1336 Support Postgres LTREE columns. Do you think it's a good idea? I will share my thoughts in the next comment.
Mincong Huang
@mincong-h
In order to support LTREE columns, I need to support LTREE and LTREE array. There's something similar done before for HStore (debezium/debezium@38f4a0e). It seems that the type needs to be declared in TypeRegistry so that it can be recognized. Then, a LTREE conversion needs to be added in PostgresValueConverter. Maybe some additional configuration in PostgresConnectorConfig as well. For testing, there is class AbstractRecordsProducerTest and file postgres_create_tables.ddl. Is that correct?
Gunnar Morling
@gunnarmorling
@mincong-h welcome again