I have the following requirements:
- On Day 1: I configured the following Debezium Connector config with a total of 2 tables in the include list. I wanted a complete snapshot for my use case. So, I purposely kept the default value for snapshort.mode (initial).
{
"name": "debezium-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "{{db_endpoint}}",
"database.port": "{{db_port}}",
"database.user": "{{db_user}}",
"database.password": "{{db_pwd}}",
"database.ssl.mode": "required",
"database.server.id": "123456",
"database.server.name": "dev_db",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "dbhistory",
"table.include.list": "SCHEMA.table1,SCHEMA.table2",
"event.processing.failure.handling.mode": "fail",
"include.schema.changes": "true",
"time.precision.mode": "connect",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$2.$3",
"decimal.handling.mode": "double",
"heartbeat.interval.ms": "60000"
}
}
- On Day 2 or any subsequent day: I got a requirement to add a few more tables (e.g SCHEMA.table3, SCHEMA.table4). In such case, I do not want to create new connector configs as I will keep on adding new tables in the existing connector when such a requirement would come up and I do not want to maintain too many instances of debezium connector configs.
So, this is how my updated debezium connector config would look like:
{
"name": "debezium-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "{{db_endpoint}}",
"database.port": "{{db_port}}",
"database.user": "{{db_user}}",
"database.password": "{{db_pwd}}",
"database.ssl.mode": "required",
"database.server.id": "123456",
"database.server.name": "dev_db",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "dbhistory",
"table.include.list": "SCHEMA.table1,SCHEMA.table2,SCHEMA.table3,SCHEMA.table4",
"event.processing.failure.handling.mode": "fail",
"include.schema.changes": "true",
"time.precision.mode": "connect",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$2.$3",
"decimal.handling.mode": "double",
"heartbeat.interval.ms": "60000"
}
}
As mentioned earlier, I always want "snapshot.mode":"initial" as I want a full snapshot followed by live CDC events. However, Whenever I re-register the same connector for the second time (for SCHEMA.table3,SCHEMA.table4 tables), I do not want the initial snapshot process to get started again for earlier tables(SCHEMA.table1, SCHEMA.table2 tables). My use case can't afford to have duplicate CDC events.
Right now I am using Debezium 1.9 version.
I understand that this question has been asked already by many folks. However, even after trying the following solutions, I did not get desired results.
Solution 1: Using Signalling
Even after trying to follow all the steps mentioned in the debezium documentation, I got following error:
Solution 2: Using snapshot.new.tables=parallel. But still no luck.
Solution 3: As Mentioned by @Gunnar in this link:
* Add the new tables, but don't write to them yet
* Shut down the connector
* Change the filter configuration and set snapshot mode to "schema_only_recovery"
* Delete (or rename) the existing DB history topic
* Restart the connector; this will re-create the internal history of the DB schema and then continue streaming from the previously recorded offset
* Begin writing to the new tables
With the above solution, I'm afraid that new tables(SCHEMA.Table3, SCHEMA.Table4) might not have initial snapshot and I would endup no getting historical data /events for these new tables.
Appreciate if anybody can help me with this.