1

I am trying to follow the tutorial Deploying Debezium using the new KafkaConnector resource. Based on the tutorial, I am also using minikube but with docker driver. Basically just follow exactly step by step.

However, for the step "Create the connector", after creating the connector by

cat <<EOF | kubectl -n kafka apply -f -
apiVersion: "kafka.strimzi.io/v1alpha1"
kind: "KafkaConnector"
metadata:
  name: "inventory-connector"
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  tasksMax: 1
  config:
    database.hostname: 192.168.99.1
    database.port: "3306"
    database.user: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_username}"
    database.password: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_password}"
    database.server.id: "184054"
    database.server.name: "dbserver1"
    database.whitelist: "inventory"
    database.history.kafka.bootstrap.servers: "my-cluster-kafka-bootstrap:9092"
    database.history.kafka.topic: "schema-changes.inventory"
    include.schema.changes: "true" 
EOF

and check by

kubectl -n kafka get kctr inventory-connector -o yaml

I got error

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  annotations:
    kubectl.kubernetes.io/last-applied-configuration: |
      {"apiVersion":"kafka.strimzi.io/v1alpha1","kind":"KafkaConnector","metadata":{"annotations":{},"labels":{"strimzi.io/cluster":"my-connect-cluster"},"name":"inventory-connector","namespace":"kafka"},"spec":{"class":"io.debezium.connector.mysql.MySqlConnector","config":{"database.history.kafka.bootstrap.servers":"my-cluster-kafka-bootstrap:9092","database.history.kafka.topic":"schema-changes.inventory","database.hostname":"192.168.49.2","database.password":"","database.port":"3306","database.server.id":"184054","database.server.name":"dbserver1","database.user":"","database.whitelist":"inventory","include.schema.changes":"true"},"tasksMax":1}}
  creationTimestamp: "2021-09-29T18:20:11Z"
  generation: 1
  labels:
    strimzi.io/cluster: my-connect-cluster
  name: inventory-connector
  namespace: kafka
  resourceVersion: "12777"
  uid: 083df9a3-83ce-4170-a9bc-9573dafdb286
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  config:
    database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
    database.history.kafka.topic: schema-changes.inventory
    database.hostname: 192.168.49.2
    database.password: ""
    database.port: "3306"
    database.server.id: "184054"
    database.server.name: dbserver1
    database.user: ""
    database.whitelist: inventory
    include.schema.changes: "true"
  tasksMax: 1
status:
  conditions:
  - lastTransitionTime: "2021-09-29T18:20:11.548Z"
    message: |-
      PUT /connectors/inventory-connector/config returned 400 (Bad Request): Connector configuration is invalid and contains the following 1 error(s):
      A value is required
      You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
    reason: ConnectRestException
    status: "True"
    type: NotReady
  observedGeneration: 1

I tried to change

database.user: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_username}"
database.password: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_password}"

to

database.user: "debezium"
database.password: "dbz"

directly and re-apply, based on the user and password info in "Secure the database credentials" step.

Also, based on the description in the tutorial

I’m using database.hostname: 192.168.99.1 as IP address for connecting to MySQL because I’m using minikube with the virtualbox VM driver If you’re using a different VM driver with minikube you might need a different IP address.

I am actually a little confused for above description. MySQL in the demo is deployed in Docker, while the rest of parts like Kafka are deployed in minikube. Why the description about database.hostname says minikube instead of Docker?

Anyway, when I run minikube ip, I got 192.168.49.2. However, after I change database.hostname to 192.168.49.2, and run kubectl get kctr inventory-connector -o yaml -n kafka, I got

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  annotations:
    kubectl.kubernetes.io/last-applied-configuration: |
      {"apiVersion":"kafka.strimzi.io/v1alpha1","kind":"KafkaConnector","metadata":{"annotations":{},"labels":{"strimzi.io/cluster":"my-connect-cluster"},"name":"inventory-connector","namespace":"kafka"},"spec":{"class":"io.debezium.connector.mysql.MySqlConnector","config":{"database.history.kafka.bootstrap.servers":"my-cluster-kafka-bootstrap:9092","database.history.kafka.topic":"schema-changes.inventory","database.hostname":"192.168.49.2","database.password":"","database.port":"3306","database.server.id":"184054","database.server.name":"dbserver1","database.user":"","database.whitelist":"inventory","include.schema.changes":"true"},"tasksMax":1}}
  creationTimestamp: "2021-09-29T18:20:11Z"
  generation: 1
  labels:
    strimzi.io/cluster: my-connect-cluster
  name: inventory-connector
  namespace: kafka
  resourceVersion: "12777"
  uid: 083df9a3-83ce-4170-a9bc-9573dafdb286
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  config:
    database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
    database.history.kafka.topic: schema-changes.inventory
    database.hostname: 192.168.49.2
    database.password: ""
    database.port: "3306"
    database.server.id: "184054"
    database.server.name: dbserver1
    database.user: ""
    database.whitelist: inventory
    include.schema.changes: "true"
  tasksMax: 1
status:
  conditions:
  - lastTransitionTime: "2021-09-29T18:20:11.548Z"
    message: |-
      PUT /connectors/inventory-connector/config returned 400 (Bad Request): Connector configuration is invalid and contains the following 1 error(s):
      A value is required
      You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
    reason: ConnectRestException
    status: "True"
    type: NotReady
  observedGeneration: 1

I can access MySQL by localhost as it is hosted in Docker. However, I still same error when I changed database.hostname to localhost.

Any idea? Thanks!

Hongbo Miao
  • 45,290
  • 60
  • 174
  • 267
  • The error seems to be 400 Bad Request, so it might have connected to the host, but your input was perhaps invalid. Can you try validating your configs? – JavaTechnical Sep 29 '21 at 18:10
  • Does using double quotes "" in database hostname and other fields help? – JavaTechnical Sep 29 '21 at 18:17
  • Thanks @JavaTechnical after adding quote like `database.hostname: "192.168.49.2"`, still same. – Hongbo Miao Sep 29 '21 at 18:23
  • Did you add "" for all? Also try adding `connector.class` to config and check – JavaTechnical Sep 29 '21 at 18:29
  • @JavaTechnical hmm what do you mean? In spec -> config, only `database.hostname` value does not have `""`. – Hongbo Miao Sep 29 '21 at 18:31
  • Others also do not have, `database.history.kafka.bootstrap.servers`,`database.history.kafka.topic`, `database.server.name`, `database.whitelist`. Also check these required props: https://debezium.io/documentation/reference/connectors/mysql.html#mysql-required-connector-configuration-properties – JavaTechnical Sep 29 '21 at 18:34
  • @JavaTechnical oh, when I run `kubectl apply` (written in my question), I do have `""` for all fields now (also added `""` for `database.hostname` value). However, after running `kubectl -n kafka get kctr inventory-connector -o yaml`, they just disappear. – Hongbo Miao Sep 29 '21 at 18:37

1 Answers1

1

The issue is related with the service in minikube failed to communicate with the MySQL in the docker.

Regarding how to access host's localhost from inside Kubernetes cluster, I found How to access host's localhost from inside kubernetes cluster

However, I end up with deploying MySQL in Kubernetes direction by

kubectl apply -f https://k8s.io/examples/application/mysql/mysql-pv.yaml
kubectl apply -f https://k8s.io/examples/application/mysql/mysql-deployment.yaml

(Copied from https://kubernetes.io/docs/tasks/run-application/run-single-instance-stateful-application/)

with

database.hostname: "mysql.default" # service `mysql` in namespace `default`
database.port: "3306"
database.user: "root"
database.password: "password"

Now when I run

kubectl -n kafka get kctr inventory-connector -o yaml

I got a new error saying MySQL not enabling row-level binlog, however, it means it can connect the MySQL now.

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  annotations:
    kubectl.kubernetes.io/last-applied-configuration: |
      {"apiVersion":"kafka.strimzi.io/v1alpha1","kind":"KafkaConnector","metadata":{"annotations":{},"labels":{"strimzi.io/cluster":"my-connect-cluster"},"name":"inventory-connector","namespace":"kafka"},"spec":{"class":"io.debezium.connector.mysql.MySqlConnector","config":{"database.history.kafka.bootstrap.servers":"my-cluster-kafka-bootstrap:9092","database.history.kafka.topic":"schema-changes.inventory","database.hostname":"mysql.default","database.password":"password","database.port":"3306","database.server.id":"184054","database.server.name":"dbserver1","database.user":"root","database.whitelist":"inventory","include.schema.changes":"true"},"tasksMax":1}}
  creationTimestamp: "2021-09-29T19:36:52Z"
  generation: 1
  labels:
    strimzi.io/cluster: my-connect-cluster
  name: inventory-connector
  namespace: kafka
  resourceVersion: "2918"
  uid: 48bb46e1-42bb-4574-a3dc-221ae7d6a803
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  config:
    database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
    database.history.kafka.topic: schema-changes.inventory
    database.hostname: mysql.default
    database.password: password
    database.port: "3306"
    database.server.id: "184054"
    database.server.name: dbserver1
    database.user: root
    database.whitelist: inventory
    include.schema.changes: "true"
  tasksMax: 1
status:
  conditions:
  - lastTransitionTime: "2021-09-29T19:36:53.605Z"
    status: "True"
    type: Ready
  connectorStatus:
    connector:
      state: UNASSIGNED
      worker_id: 172.17.0.8:8083
    name: inventory-connector
    tasks:
    - id: 0
      state: FAILED
      trace: "org.apache.kafka.connect.errors.ConnectException: The MySQL server is
        not configured to use a row-level binlog, which is required for this connector
        to work properly. Change the MySQL configuration to use a row-level binlog
        and restart the connector.\n\tat io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:207)\n\tat
        io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:49)\n\tat
        org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)\n\tat
        org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat
        org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat
        java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat
        java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
        java.lang.Thread.run(Thread.java:748)\n"
      worker_id: 172.17.0.8:8083
    type: source
  observedGeneration: 1
Hongbo Miao
  • 45,290
  • 60
  • 174
  • 267