10

I'm using the Debezium (0.7.5) MySQL connector and I'm trying to understand what is the best approach if I want to update this configuration with the option table.whitelist.

Let's say I create a connector, something like this:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://debezium-host/connectors/ -d '
{
  "name": "MyConnector",
  "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "connect.timeout.ms": "60000",
      "tasks.max": "1",
      "database.hostname": "myhost",
      "database.port": "3306",
      "database.user": "***",
      "database.password": "***",
      "database.server.id": "3227197",
      "database.server.name": "MyServer",
      "database.whitelist": "myDb",
      "table.whitelist": "myDb.table1,myDb.table2",
      "database.history.kafka.bootstrap.servers": "kb0:9092,kb1:9092,kb2:9092",
      "database.history.kafka.topic": "MyConnectorHistoryTopic",
      "max.batch.size": "1024",
      "snapshot.mode": "initial",
      "decimal.handling.mode": "double"
    }
}'

After some time (2 weeks), I need to add a new table (myDb.table3) to this table.whitelist option (and this table is an old one, it was created before the connector)

What I tried was:

  • Pause the connector.
  • Deleted the history topic (maybe this was the problem?).
  • Updated the config via API update config endpoint.
  • Resume the connector.

Update command via API:

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" https://kafka-connect-host/connectors/MyConnector/config/ -d '
{
  "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  "connect.timeout.ms": "60000",
  "tasks.max": "1",
  "database.hostname": "myhost",
  "database.port": "3306",
  "database.user": "***",
  "database.password": "***",
  "database.server.id": "3227197",
  "database.server.name": "MyServer",
  "database.whitelist": "myDb",
  "table.whitelist": "myDb.table1,myDb.table2,myDb.table3",
  "database.history.kafka.bootstrap.servers": "kb0:9092,kb1:9092,kb2:9092",
  "database.history.kafka.topic": "MyConnectorHistoryTopic",
  "max.batch.size": "1024",
  "snapshot.mode": "schema_only",
  "decimal.handling.mode": "double"
}'

But it didn't work and maybe this isn't the best approach at all. In other connectors I'm not using the option table.whitelist, so when I needed to listen na new table, I didn't have this problem.

My last option, I think would be delete this connector and create another one with this new configuration also listening the new table (myDb.table3). The problem is if I want the initial data from myDb.table3 I would have to create with the snapshot initial but I don't wanna to generate all the messages from the snapshot from the other tables myDb.table1,myDb.table2.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
japoneizo
  • 508
  • 1
  • 5
  • 15

3 Answers3

4

The latest version of Debezium Server, you can add the following config

debezium.snapshot.new.tables=parallel

In case If you are using Debezium, you can try this config value

snapshot.new.tables=parallel

Note: Debeziyum Server is the one that supports Kinesis, Google Pub sub, and Apache Pulsar. I am using that and its configuration is a bit different. I had to prepend "debezium" before each item

Once this configuration is added, any addition to tables.whitelist, For these additional tables Debezium will create snapshots.

I cannot point you to the documentation but I went through their code in GitHub and also I tried it practically which worked for me. Here is the link to the MySqlConnector code

https://github.com/debezium/debezium/blob/master/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java

There search for Field.create("snapshot.new.tables")

Personally, I feel like Debezium has a lot of things but documentation is scattered.

  • Hey Pavan, could you post a link to the documentation about the snapshot.new.tables config please? I can't find it on the docs site – MarkNS Jan 13 '21 at 15:28
  • 1
    Hi @MarkNS, I cannot point you to the documentation but I went through their code in github and also I tried it practically which worked for me. Here is the link to MySqlConnector code https://github.com/debezium/debezium/blob/master/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java There search for Field.create("snapshot.new.tables") Personally, I feel like Debezium has a lot of things but documentation is scattered. – Pavan Kumar Aryasomayajulu Jan 15 '21 at 05:36
  • with debezium-connector-mysql-1.8.1.Final-plugin It's not working , what version are your using ? thanks – raphaelauv Mar 30 '22 at 08:51
3

Changes to the whitelist/blacklist config are not yet supported at this point. This is currently being worked on (see DBZ-175), and we hope to have preview support for this in one of the next releases. There's a pending PR for this, which needs a bit more work, though.

Until this has been implemented, your best option is to set up a new instance of the connector which only captures the additional tables you're interested in. This comes at the price of running two connectors (which both will maintain a binlog reader session), but it does the trick as long as you don't need to change your filter config too often.

Gunnar
  • 18,095
  • 1
  • 53
  • 73
  • Thanks, @Gunnar. I did what you suggested. Now, I'm planning to migrate from Debezium 7.5 to 9 to avoid this problem. – japoneizo Feb 26 '19 at 19:28
  • @japoneizo have you got you problem solution with new version?. If yes then please send me debezium exact version you are using. – ankit Aug 28 '19 at 16:00
  • @japoneizo currently on which version are you using? If you are on 0.10, then well and good it's working fine(make sure this configuration ```binlog_row_image=full```). If 0.9.2.Final, How did you achieve? – Priyabrata Nov 15 '19 at 06:53
  • is this issue also happen with latest version of DBZ? like ver 1.1? – YVS1997 Jun 18 '20 at 07:43
  • is it still an issue? – Alok Kumar Singh Sep 14 '20 at 11:09
1

i am have the same problem and solve with a signal table to debezium. Its work that way, you have to create a table to send to debezium commands in your datatable.

CREATE TABLE public.debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32)  NULL, data VARCHAR(2048)  NULL);

and set in your configuration do debzium a tag "signal.data.collection": "public.debezium_signal"

after that you can send commands with insert in that table:

INSERT INTO debezium_signal (id, type, data)
VALUES(gen_random_uuid(),'execute-snapshot','{"data-collections": "myDb.table3"]}');

in my case i have to add de table signal in table.include.list and the columns in column.include.list as well.

https://debezium.io/documentation/reference/stable/configuration/signalling.html