1

I have the following requirements:

  1. On Day 1: I configured the following Debezium Connector config with a total of 2 tables in the include list. I wanted a complete snapshot for my use case. So, I purposely kept the default value for snapshort.mode (initial).
{
    "name": "debezium-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "{{db_endpoint}}",
        "database.port": "{{db_port}}",
        "database.user": "{{db_user}}",
        "database.password": "{{db_pwd}}",
        "database.ssl.mode": "required",
        "database.server.id": "123456",
        "database.server.name": "dev_db",
        "database.history.kafka.bootstrap.servers": "localhost:9092",
        "database.history.kafka.topic": "dbhistory",
        "table.include.list": "SCHEMA.table1,SCHEMA.table2",
        "event.processing.failure.handling.mode": "fail",
        "include.schema.changes": "true",
        "time.precision.mode": "connect",
        "transforms": "route",
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.route.replacement": "$2.$3",
        "decimal.handling.mode": "double",
        "heartbeat.interval.ms": "60000"
    }
}
  1. On Day 2 or any subsequent day: I got a requirement to add a few more tables (e.g SCHEMA.table3, SCHEMA.table4). In such case, I do not want to create new connector configs as I will keep on adding new tables in the existing connector when such a requirement would come up and I do not want to maintain too many instances of debezium connector configs.

So, this is how my updated debezium connector config would look like:

{
    "name": "debezium-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "{{db_endpoint}}",
        "database.port": "{{db_port}}",
        "database.user": "{{db_user}}",
        "database.password": "{{db_pwd}}",
        "database.ssl.mode": "required",
        "database.server.id": "123456",
        "database.server.name": "dev_db",
        "database.history.kafka.bootstrap.servers": "localhost:9092",
        "database.history.kafka.topic": "dbhistory",
        "table.include.list": "SCHEMA.table1,SCHEMA.table2,SCHEMA.table3,SCHEMA.table4",
        "event.processing.failure.handling.mode": "fail",
        "include.schema.changes": "true",
        "time.precision.mode": "connect",
        "transforms": "route",
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.route.replacement": "$2.$3",
        "decimal.handling.mode": "double",
        "heartbeat.interval.ms": "60000"
    }
}

As mentioned earlier, I always want "snapshot.mode":"initial" as I want a full snapshot followed by live CDC events. However, Whenever I re-register the same connector for the second time (for SCHEMA.table3,SCHEMA.table4 tables), I do not want the initial snapshot process to get started again for earlier tables(SCHEMA.table1, SCHEMA.table2 tables). My use case can't afford to have duplicate CDC events.

Right now I am using Debezium 1.9 version.

I understand that this question has been asked already by many folks. However, even after trying the following solutions, I did not get desired results.

Solution 1: Using Signalling

Even after trying to follow all the steps mentioned in the debezium documentation, I got following error:

Solution 2: Using snapshot.new.tables=parallel. But still no luck.

Solution 3: As Mentioned by @Gunnar in this link:

* Add the new tables, but don't write to them yet
* Shut down the connector
* Change the filter configuration and set snapshot mode to "schema_only_recovery"
* Delete (or rename) the existing DB history topic
* Restart the connector; this will re-create the internal history of the DB schema and then continue streaming from the previously recorded offset
* Begin writing to the new tables

With the above solution, I'm afraid that new tables(SCHEMA.Table3, SCHEMA.Table4) might not have initial snapshot and I would endup no getting historical data /events for these new tables.

Appreciate if anybody can help me with this.

Ajinkya Bambal
  • 101
  • 2
  • 8

0 Answers0