0

Happy Thursday! I've been experimenting with creating a connector Postgres Connector in Debezium but I can only capture the changes if the table already exists in my MySQL instance which isn't ideal. Because then I would have to write a script in Python that would handle such events and it may be easier to use something that already exists than reinvent the wheel. I want to be able to capture the DDL in the actual connector. I came across this blog post. https://debezium.io/blog/2017/09/25/streaming-to-another-database/ and I got it working on my local set-up which is great, but the only issue is I want to go in the opposite direction. (I am able to capture new records, deleted records, and updated records, and it creates the new tables and new columns as well if they don't exist). I want to stream from postgres and have the connector insert into a target db in mysql. I tried switching the jdbc source and sink connectors respectively but I wasn't getting the new records inserted from postgres into mysql. It seems like I can find people inserting into postgres from mysql all over the place but not the other direction. Here is the GitHub directory I based my set up off of to get the mysql-kafka-postgres to work. https://github.com/debezium/debezium-examples/tree/main/unwrap-smt

I tried to go a different way but it seems like it's killing my docker image as I boot up saying " Couldn't resolve server kafka:9092 from bootstrap.servers as DNS resolution failed for kafka [org.apache.kafka.clients.ClientUtils]" Here is my source json and my sink json.

 {
"name": "jdbc-sink",
"config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "postgres.public.customers",
    "connection.url": "jdbc:mysql://mysql:3306/inventory",
    "connection.user": "debezium",
    "connection.password": "dbz",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "auto.create": "true",
    "auto.evolve": "true",
    "insert.mode": "upsert",
    "delete.enabled": "true",
    "pk.fields": "id",
    "pk.mode": "record_key"
}
}


{
"name": "inventory-connector",
"config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "topic.prefix": "psql",
    "mode": "bulk",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgresuser",
    "database.password": "postgrespw",
    "database.dbname": "inventory",
    "table.include.list": "public.customers",
    "slot.name": "test_slot",
    "plugin.name": "wal2json",
    "database.server.name": "psql",
    "tombstones.on.delete": "true",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "https://[APACHE_KAFKA_HOST]:[SCHEMA_REGISTRY_PORT]",
    "key.converter.basic.auth.credentials.source": "USER_INFO",
    "key.converter.schema.registry.basic.auth.user.info": "[SCHEMA_REGISTRY_USER]:[SCHEMA_REGISTRY_PASSWORD]",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "https://[APACHE_KAFKA_HOST]:[SCHEMA_REGISTRY_PORT]",
    "value.converter.basic.auth.credentials.source": "USER_INFO",
    "value.converter.schema.registry.basic.auth.user.info": "[SCHEMA_REGISTRY_USER]:[SCHEMA_REGISTRY_PASSWORD]"
}
}

Everything else remains the same on the blog that I have followed. Any help is welcome.

Alex
  • 211
  • 1
  • 11

1 Answers1

1

I believe there are two different questions here:

  1. How to handle non existing columns in Mysql. The JDBC sink connector should have a flag called auto.create that, if set to true allows the connector to create tables if they don't exist (auto.evolve also allows table evolution)

  2. PG -> Kafka -> Mysql is possible, you can find an example of it that I wrote some time ago here. The examples uses Aiven for PostgreSQL and Aiven for Apache Kafka but you should be able to adapt the connectors to work in any kind of PG and Kafka.

Would be interesting to know there your PG->Kafka->MySQL pipeline stops working.

Disclaimer: I work for Aiven

Ftisiot
  • 1,808
  • 1
  • 7
  • 13
  • Thank you @Ftisiot apparently Google needs to approve their algorithms. This should be the first post that appears but apparently it was quite hidden. Thank you so much! I will try it out! – Alex Jul 22 '22 at 15:41
  • hey @Ftisiot, I tried implementing it in the opposite direction but it seems it is killing my image and giving me an error. " Couldn't resolve server kafka:9092 from bootstrap.servers as DNS resolution failed for kafka [org.apache.kafka.clients.ClientUtils]" – Alex Jul 25 '22 at 21:20
  • @Alex Assuming you are using Docker Compose, then you need to [ensure that `kafka` is the advertised name of the Kafka container](https://stackoverflow.com/questions/51630260/connect-to-kafka-running-in-docker) – OneCricketeer Jul 26 '22 at 02:05
  • As @OneCricketeer said, the error is related to networking between connect and kafka and not about the connector itself – Ftisiot Jul 26 '22 at 07:18
  • @Ftisiot it seems like I'm not getting any topics pushed into kafka with my set up and then curling the source.json file, I'm not sure why. I will put the source.json file in the og post. – Alex Jul 27 '22 at 03:30
  • also tried it with pgoutput and all sorts of things, I think I was forgetting to start over my docker compose and go from scratch so I may have had the right configuration at one time.. whew this is a mess. – Alex Jul 27 '22 at 03:32