Happy Thursday! I've been experimenting with creating a connector Postgres Connector in Debezium but I can only capture the changes if the table already exists in my MySQL instance which isn't ideal. Because then I would have to write a script in Python that would handle such events and it may be easier to use something that already exists than reinvent the wheel. I want to be able to capture the DDL in the actual connector. I came across this blog post. https://debezium.io/blog/2017/09/25/streaming-to-another-database/ and I got it working on my local set-up which is great, but the only issue is I want to go in the opposite direction. (I am able to capture new records, deleted records, and updated records, and it creates the new tables and new columns as well if they don't exist). I want to stream from postgres and have the connector insert into a target db in mysql. I tried switching the jdbc source and sink connectors respectively but I wasn't getting the new records inserted from postgres into mysql. It seems like I can find people inserting into postgres from mysql all over the place but not the other direction. Here is the GitHub directory I based my set up off of to get the mysql-kafka-postgres to work. https://github.com/debezium/debezium-examples/tree/main/unwrap-smt
I tried to go a different way but it seems like it's killing my docker image as I boot up saying " Couldn't resolve server kafka:9092 from bootstrap.servers as DNS resolution failed for kafka [org.apache.kafka.clients.ClientUtils]" Here is my source json and my sink json.
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "postgres.public.customers",
"connection.url": "jdbc:mysql://mysql:3306/inventory",
"connection.user": "debezium",
"connection.password": "dbz",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.fields": "id",
"pk.mode": "record_key"
}
}
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"topic.prefix": "psql",
"mode": "bulk",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgresuser",
"database.password": "postgrespw",
"database.dbname": "inventory",
"table.include.list": "public.customers",
"slot.name": "test_slot",
"plugin.name": "wal2json",
"database.server.name": "psql",
"tombstones.on.delete": "true",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://[APACHE_KAFKA_HOST]:[SCHEMA_REGISTRY_PORT]",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.schema.registry.basic.auth.user.info": "[SCHEMA_REGISTRY_USER]:[SCHEMA_REGISTRY_PASSWORD]",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://[APACHE_KAFKA_HOST]:[SCHEMA_REGISTRY_PORT]",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.schema.registry.basic.auth.user.info": "[SCHEMA_REGISTRY_USER]:[SCHEMA_REGISTRY_PASSWORD]"
}
}
Everything else remains the same on the blog that I have followed. Any help is welcome.