I'm trying to get debezium to run on Local MySQL instance after managing to get it to work on a cloud hosted MySQL server and it's not taking/exporting the snapshot of my target database, despite doing so for my cloud hosted MySQL.
Here is my docker-compose.yml file:
volumes:
mysql-data:
mysql-log:
mysql-conf:
services:
zookeeper:
image: docker.io/bitnami/zookeeper:latest
ports:
- 2181:2181
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: docker.io/bitnami/kafka:latest
ports:
- 9092:9092
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
kafka-connect:
image: docker.io/debezium/connect:latest
ports:
- 8083:8083
depends_on:
- zookeeper
- kafka
- mysql
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my-connect-configs
- OFFSET_STORAGE_TOPIC=my-connect-offsets
- STATUS_STORAGE_TOPIC=my-connect-statuses
nmap:
image: docker.io/instrumentisto/nmap
entrypoint: /bin/sh
stdin_open: true
tty: true
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
depends_on:
- zookeeper
- kafka
- kafka-connect
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
KAFKA_CLUSTERS_0_JMXPORT: 9997
mysql:
image: mysql:5.7
ports:
- 3306:3306
environment:
- MYSQL_ROOT_PASSWORD=password
- MYSQL_DATABASE=testdb
- MYSQL_USER=test
- MYSQL_PASSWORD=test
volumes:
- mysql-data:/var/lib/mysql
- mysql-log:/var/log/mysql
- mysql-conf:/etc/mysql/conf.d
- /Users/Name/repos/debezium/custom-config.cnf:/etc/mysql/conf.d/custom-config.cnf:ro
healthcheck:
test: ["CMD", "mysqladmin" ,"ping", "-h", "localhost"]
interval: 20s
retries: 10
consumer:
image: python-kafka-consumer:latest
I had issues with my local MySQL user and it's Grants so I set Super_priv, Repl_client_priv and Repl_slave_priv to 'Y' using these queries:
UPDATE mysql.user SET Super_Priv='Y' WHERE user='test' AND host='%';
UPDATE mysql.user SET Repl_client_priv='Y' WHERE user='test' AND host='%';
UPDATE mysql.user SET Repl_slave_priv='Y' WHERE user='test' AND host='%';
FLUSH PRIVILEGES;
Below are the commands I used up to this point. Any help would be much appreciated!
I've ran the curl command to register the Debezium MySQL Connector pointed at my test table in my Local MySQL :
curl -i -X POST -H "Accept:application/json"
-H "Content-Type:application/json" localhost:8083/connectors/
-d '{ "name": "local-mysql-test-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1", "database.hostname": "host.docker.internal",
"database.port": "3306", "database.user": "test",
"database.password": "test", "database.server.id": "100",
"database.server.name": "local_mysql", "database.include.list": "testdb.Persons", "database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.mysql" } }'
This is the curl command for the Cloud MySQL Connector:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "test-connector",
"config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1", "database.hostname": "host.docker.internal",
"database.port": "3310", "database.user": "user",
"database.password": "password", "database.server.id": "4564865", "database.server.name": "test_server", "database.include.list": "testDB", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.testDB" } }'
Then in the Debezium/Kafka Connect logs, it displays that it reads the test db ('testdb') that I aiming it to but it doesn't take the snapshot of the table, nor does it export that snapshot.
MySQL|local_mysql|snapshot Snapshot step 7 - Snapshotting data [io.debezium.relational.RelationalSnapshotChangeEventSource]
MySQL|local_mysql|snapshot Snapshotting contents of 0 tables while still in transaction [io.debezium.relational.RelationalSnapshotChangeEventSource]
and then in the snapshot result it confirms this with:
tableIds=[null], databaseName=testdb]
Where as the Connector connected to the Cloud MySQL both picks up the table it's pointed to, takes a snapshot of it and exports it correctly:
MySQL|test_server|snapshot Snapshot step 7 - Snapshotting data [io.debezium.relational.RelationalSnapshotChangeEventSource]
MySQL|test_server|snapshot Snapshotting contents of 1 tables while still in transaction [io.debezium.relational.RelationalSnapshotChangeEventSource]
MySQL|test_server|snapshot Exporting data from table 'testDB.test_table' (1 of 1 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
MySQL|test_server|snapshot For table 'testDB.test_table' using select statement: 'SELECT `id`, `event_code`, `event_date`, `created_at`, `modified_at` FROM `testDB`.`test_table`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
MySQL|test_server|snapshot Finished exporting 18 records for table 'testDB.test_table'; total duration '00:00:00.087' [io.debezium.relational.RelationalSnapshotChangeEventSource]
MySQL|test_server|snapshot Snapshot - Final stage [io.debezium.pipeline.source.AbstractSnapshotChangeEventSource]
MySQL|test_server|snapshot Snapshot ended with SnapshotResult [status=COMPLETED, offset=MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.003725, currentBinlogPosition=62104189, currentRowNumber=0, serverId=0, sourceTime=2022-06-25T18:37:49.244Z, threadId=-1, currentQuery=null, tableIds=[testDB.test_table], databaseName=youth_economy], snapshotCompleted=true, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=c7bbd9dd-bf97-11e9-8a95-42010a2d0007:1-38268404, currentGtidSet=c7bbd9dd-bf97-11e9-8a95-42010a2d0007:1-38268404, restartBinlogFilename=mysql-bin.003725, restartBinlogPosition=62104189, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]] [io.debezium.pipeline.ChangeEventSourceCoordinator]
2022-06-25 18:37:49,428 INFO MySQL|test_server|binlog Connected to MySQL binlog at host.docker.internal:3310, starting at MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.003725, currentBinlogPosition=62104189, currentRowNumber=0, serverId=0, sourceTime=2022-06-25T18:37:49.244Z, threadId=-1, currentQuery=null, **tableIds=[testDB.test_table]**, databaseName=youth_economy], snapshotCompleted=true, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=c7bbd9dd-bf97-11e9-8a95-42010a2d0007:1-38268404, currentGtidSet=c7bbd9dd-bf97-11e9-8a95-42010a2d0007:1-38268404, restartBinlogFilename=mysql-bin.003725, restartBinlogPosition=62104189, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]] [io.debezium.connector.mysql.MySqlStreamingChangeEventSource]
Please Help!!!