5

I'm trying to sink the table data one DB to another DB using kafka debezium ( Kafka streaming ) with the help of docker. DB stream is working fine. But streamed data to sink another MySQL DB process getting an error.

For my connector sink configurations as below.

 {
  "name": "mysql_sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "topics": "mysql-connect.kafka_test.employee",
    "connection.url": "jdbc:mysql://localhost/kafka_test_1&user=debezium&password=xxxxx",
    "auto.create": "true",
    "auto.evolve": "true",
    "insert.mode": "upsert",
    "pk.fields": "id",
    "pk.mode": "record_value",
    "errors.tolerance": "all",
    "errors.log.enable":"true",
    "errors.log.include.messages":"true",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "name": "mysql_sink"
  }
}

But I'm getting an error.

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: No suitable driver found for jdbc:mysql://localhost/kafka_test_1&user=debezium&password=xxxxx
io.confluent.connect.jdbc.util.CachedConnectionProvider.getValidConnection(CachedConnectionProvider.java:59)
io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:52)
io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)\n\t... 10 more\nCaused by: java.sql.SQLException: No suitable driver found for jdbc:mysql://localhost/kafka_test_1&user=debezium&password=xxxxx
java.sql.DriverManager.getConnection(DriverManager.java:689)
java.sql.DriverManager.getConnection(DriverManager.java:247)
io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:66)
io.confluent.connect.jdbc.util.CachedConnectionProvider.getValidConnection(CachedConnectionProvider.java:52)\n\t... 13 more

I'm using docker.

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
     - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    command: [start-kafka.sh]
    ports:
     - "9092:9092"
    links:
     - zookeeper
    environment:
      KAFKA_LISTENERS: PLAINTEXT://:9092,
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on:
      - zookeeper
  connect:
    build:
      context: debezium-jdbc
    ports:
     - "8083:8083"
    links:
     - kafka
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: my_connect_configs
      OFFSET_STORAGE_TOPIC: my_connect_offsets
      CLASSPATH: /kafka/connect/kafka-connect-jdbc-5.3.1.jar

I tried so many things I don't know why I'm getting this error and one more thing I don't have a knowledge of java.

Thanks in advance.

Satz K
  • 51
  • 1
  • 4
  • Relevant error: _"Caused by: java.sql.SQLException: No suitable driver found for jdbc:mysql://localhost/kafka_test_1&user=debezium&password=xxxxx"_, in other words you don't have the MySQL Connector/J driver on the classpath or the driver is not loaded – Mark Rotteveel Nov 21 '19 at 11:28
  • What is the configuration for `plugins.path` in your `server.properties` file? – Giorgos Myrianthous Nov 21 '19 at 11:53
  • 1
    @GiorgosMyrianthous I checked server.properties here `plugins.path` never mentioned. One more thing Initial `io.debezium.connector.mysql.MySqlConnector` is working fine. But `io.confluent.connect.jdbc.JdbcSinkConnector` only getting error. – Satz K Nov 21 '19 at 12:19
  • @MarkRotteveel I'm facing this issue in at the time of streaming the Kafka data to MySQL DB. `io.debezium.connector.mysql.MySqlConnector` is working fine. – Satz K Nov 21 '19 at 12:30
  • Can you share the docker file as well? There should be one config parameter `CONNECT_PLUGIN_PATH ` in it. – Giorgos Myrianthous Nov 21 '19 at 13:04
  • @GiorgosMyrianthous this is my docker compose file for connect `connect: build: context: debezium-jdbc ports: - "8083:8083" links: - kafka environment: BOOTSTRAP_SERVERS:kafka:9092 GROUP_ID: 1 CONFIG_STORAGE_TOPIC: my_connect_configs OFFSET_STORAGE_TOPIC: my_connect_offsets` – Satz K Nov 21 '19 at 13:50
  • under `environment`, add the config `CONNECT_PLUGIN_PATH=/opt/kafka/plugins/` and make sure all of you jar files (including debezium's and MySQL Connector/J driver) are located under `/opt/kafka/plugins/` (or any other directory of your preference). Finally restart kafka so that connectors are reloaded and it should work without any issues. – Giorgos Myrianthous Nov 21 '19 at 13:59
  • 2
    This question is a duplicate but the answer linked to is not helpful. I've voted to reopen it, in the meantime I can recommend the details given at https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector regarding this error. – Robin Moffatt Nov 21 '19 at 14:46

3 Answers3

5

You're getting this error because the JDBCSink (and JDBCSource) connectors use JDBC (as the name implies) to connect to the database, and you have not made the JDBC driver for MySQL available to the connector.

The best way to fix this is to copy the MySQL JDBC driver into the same folder as kafka-connect-jdbc (which on the Docker image is /usr/share/java/kafka-connect-jdbc/).

If you're using Docker Compose then you have three options.

  1. Build a custom Docker image with the driver installed

  2. Download the driver locally

    # Download to host machine
    mkdir local-jdbc-drivers
    cd local-jdbc-drivers
    curl https://cdn.mysql.com/Downloads/Connector-J/mysql-connector-java-8.0.18.tar.gz | tar xz 
    

    and mount it into the container into the path of Kafka Connect JDBC:

    volumes:
      - ${PWD}/local-jdbc-drivers:/usr/share/java/kafka-connect-jdbc/driver-jars/
    
  3. Install it at runtime like this:

    command: 
      - /bin/bash
      - -c 
      - |
        # JDBC Drivers
        # ------------
        # MySQL
        cd /usr/share/java/kafka-connect-jdbc/
        curl https://cdn.mysql.com/Downloads/Connector-J/mysql-connector-java-8.0.18.tar.gz | tar xz 
        # Now launch Kafka Connect
        sleep infinity &
        /etc/confluent/docker/run 
    

For more details see this blog.

Robin Moffatt
  • 30,382
  • 3
  • 65
  • 92
  • thanks for your solution. will check and get back to you. – Satz K Nov 22 '19 at 05:38
  • I tried this step but still, I'm getting an error. I'm using docker and sink the data to external MySQL DB. If possible to check the connection URL? that same URL I'm using in initial step for `io.debezium.connector.mysql.MySqlConnector` it's working. At the time of connecting `io.confluent.connect.jdbc.JdbcSinkConnector` only getting an error. – Satz K Nov 22 '19 at 12:37
  • How are you running it in Docker? Can you edit your question to show the full configuration of Docker Compose? – Robin Moffatt Nov 22 '19 at 12:39
  • I checked in loaded plugin also all the plugins are loaded. I have a question may be error will come two things one is plugin is not loaded another thing in JDBC URL is incorrectly specified. I'm using the same hostname, user and password as same only DB name are different. Is anyways to debug the DB connection. Thanks. – Satz K Nov 22 '19 at 12:51
  • it's not about the plugin being loaded, it's about the MySQL JDBC driver JAR being in the `kafka-connect-jdbc` folder – Robin Moffatt Nov 22 '19 at 16:36
  • But my initial connection MySQL connection stream process is working fine. – Satz K Nov 24 '19 at 14:03
  • It's not working fine, if you're getting that error. You're conflating one plugin (Debezium) with another (JDBCSink). They're completely independent. Don't use `CLASSPATH`; make sure that the JDBC JAR _is in the `kafka-connect-jdbc` folder – Robin Moffatt Nov 25 '19 at 09:15
2

I have been struggling a lot dealing with the same error No suitable driver found when trying to load a mysql table using kafka connect.

I am using kakfa (not confluent platform) and found out that you can either have two problems:

  • jdbc url is malformed
  • the driver chosed for your kafka is not the right one.

I have used the latest driver mysql-connector-java-8.0.21 and received the no suitable driver error. However, when I switched to version mysql-connector-java-5.1.49 (released this year 2020) everything worked like a charm.

You can get the driver versions from maven repo: https://mvnrepository.com/artifact/mysql/mysql-connector-java

Copy the driver to the classpath, in my case if downloaded kafka and copied into kafka_2.12-2.3.1/libs/ directory

Federico Piazza
  • 30,085
  • 15
  • 87
  • 123
  • I was getting the same error with MSK connect and `kafka-connect-jdbc:10.2.5` fixed it by updating postgres to lastest version `postgresql:42.3.1`. Earlier it was using `postgresql:42.2.19` from kafka-connect-jdbc's pom.xml, ideally it should have worked. Dont know why I got the error in the first place. – Snigdhajyoti Dec 30 '21 at 10:25
0

My problem was something that is a little funny actually. I had the necessary jar file in my plugin path, everything is ok until this point. But I had 3 of the same jar file located in different folders. So I searched for them by using:

find /kafka/ -name \ojdbc*.jar

and I removed the 2 of them. After restarting the service, everything started to work normally. A little probability but you may have the same problem :p