0

I'm using docker with kafka and clickhouse. I want to connect 'KsqlDB table' and 'clickhouse' using 'kafka connect'. So I referred to this document and modified 'docker composite'.

here is my docker-compose

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.2.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:6.2.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema-registry:
    image: confluentinc/cp-schema-registry:6.2.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

  connect:
    container_name: connect
    build:
      context: ./
      dockerfile: Dockerfile-kafka-connect
      args:
        clickHouseVersion: "0.2.4"
    depends_on:
      - broker
      - schema-registry
      - zookeeper
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL : http://schema-registry:8081
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      # CLASSPATH required due to CC-2422
      # CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-6.2.1.jar
      # CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      # CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR

  control-center:
    image: confluentinc/cp-enterprise-control-center:6.2.1
    hostname: control-center
    container_name: control-center
    depends_on:
      - broker
      - schema-registry
      - connect
      - ksqldb-server
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
      CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:6.2.1
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_KSQL_CONNECT_URL: "http://connect:8083"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:6.2.1
    container_name: ksqldb-cli
    depends_on:
      - broker
      - connect
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

  ksql-datagen:
    image: confluentinc/ksqldb-examples:6.2.1
    hostname: ksql-datagen
    container_name: ksql-datagen
    depends_on:
      - ksqldb-server
      - broker
      - schema-registry
      - connect
    command: "bash -c 'echo Waiting for Kafka to be ready... && \
                       cub kafka-ready -b broker:29092 1 40 && \
                       echo Waiting for Confluent Schema Registry to be ready... && \
                       cub sr-ready schema-registry 8081 40 && \
                       echo Waiting a few seconds for topic creation to finish... && \
                       sleep 11 && \
                       tail -f /dev/null'"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      STREAMS_BOOTSTRAP_SERVERS: broker:29092
      STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
      STREAMS_SCHEMA_REGISTRY_PORT: 8081

  rest-proxy:
    image: confluentinc/cp-kafka-rest:6.2.1
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

  clickhouse:
    image: yandex/clickhouse-server:20.4
    hostname: clickhouse
    container_name: clickhouse

And This is Dockerfile-kafka-connect

FROM confluentinc/cp-kafka-connect-base:6.2.1

ARG jdbcDriverPath
ARG clickHouseVersion

ENV JDBC_DRIVER_PATH=/usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib
ENV JDBC_DRIVER=clickhouse-jdbc-$clickHouseVersion.jar

RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest

RUN mkdir -p $JDBC_DRIVER_PATH && \
    echo "Downloading JDBC Driver for ClickHouse v$clickHouseVersion" && \
    wget -O ${JDBC_DRIVER_PATH}/${JDBC_DRIVER} https://github.com/ClickHouse/clickhouse-jdbc/releases/download/release_$clickHouseVersion/${JDBC_DRIVER}

And I typed this command in 'KsqlDB'.

CREATE SOURCE CONNECTOR `clickhouse-kospi-connector` WITH (
    "value.converter.schema.registry.url"= "http://schema-registry:8081",
    "key.converter.schema.registry.url"= "http://schema-registry:8081",
    "connector.class"= "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max"= "1",
    "key.converter"= "io.confluent.connect.avro.AvroConverter",
    "value.converter"= "io.confluent.connect.avro.AvroConverter",
    "errors.log.enable"= "true",
    "errors.log.include.messages"= "true",
    "topics"= "S3_FINAL",
    "connection.url"= "jdbc:clickhouse://clickhouse:8123/default",
    "auto.create"= "true"
  }

('S3_FINAL' topic has AVRO format for both multi-key and values.)

And It doesn't work. So I read the error message.

ERROR WorkerSinkTask{id=clickhouse-kospi-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: org/apache/http/conn/HttpClientConnectionManager (org.apache.kafka.connect.runtime.WorkerSinkTask)

java.lang.NoClassDefFoundError: org/apache/http/conn/HttpClientConnectionManager

at ru.yandex.clickhouse.ClickHouseConnectionImpl.<init>(ClickHouseConnectionImpl.java:73)

at ru.yandex.clickhouse.ClickHouseDriver.connect(ClickHouseDriver.java:55)

at ru.yandex.clickhouse.ClickHouseDriver.connect(ClickHouseDriver.java:47)

at ru.yandex.clickhouse.ClickHouseDriver.connect(ClickHouseDriver.java:29)

at java.sql/java.sql.DriverManager.getConnection(DriverManager.java:677)

at java.sql/java.sql.DriverManager.getConnection(DriverManager.java:189)

at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getConnection(GenericDatabaseDialect.java:239)

at io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:80)

at io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:52)

at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:64)

at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)

at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582)

at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)

at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)

at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)

at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)

at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)

at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)

at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)

at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

at java.base/java.lang.Thread.run(Thread.java:829)

Caused by: java.lang.ClassNotFoundException: org.apache.http.conn.HttpClientConnectionManager

at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)

at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)

at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)

at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)

... 22 more

I looked into the problem, but I don't know how to fill in the empty jar while using the docker. In addition, since this problem is a problem of CLASSPATH, I think it is a problem of setting. Will you be able to help me?

jelarum
  • 11
  • 2
  • Is the error from ksql container or Connect container? Have you tried to wget the jar for that class too? The jdbc connector doesn't need an HTTP client, so that's not going to be installed with Confluent hub – OneCricketeer Nov 02 '21 at 14:10
  • 1
    @OneCricketeer It was solved when 'httpcomponents-client-4.5.13' was downloaded through wget. I think 'httpclient' was needed in 'clickhouse-jdbc'. – jelarum Nov 03 '21 at 07:08
  • But there is another problem. 'org.apache.The error 'kafka.connect.errors.ConnectException: null (STRING) type doesn't have mapping to the SQL database column type at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getSqlType(GenericDatabaseDialect.java:1911) at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.writeColumnSpec(GenericDatabaseDialect.java:1827)' appeared, and the reason was found. However, appropriate action could not be taken. Do you know any related questions? (github.com/confluentinc/kafka-connect-jdbc/issues/923) – jelarum Nov 03 '21 at 07:18
  • 1
    oh my god... I did it... Thank you so much for your reply! – jelarum Nov 03 '21 at 07:31
  • Feel free to answer your own questions below rather than in the comments – OneCricketeer Nov 03 '21 at 12:54

1 Answers1

1

It was solved when 'httpcomponents-client-4.5.13' was downloaded through wget. I think 'httpclient' was needed in 'clickhouse-jdbc'. I'm using clickhouse-jdbc-v0.2.6

FROM confluentinc/cp-kafka-connect-base:6.2.1

ARG jdbcDriverPath
ARG clickHouseVersion

ENV JDBC_DRIVER_PATH=/usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib
ENV HTTP_COMP_PATH=/usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc
ENV JDBC_DRIVER=clickhouse-jdbc-$clickHouseVersion.jar

RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:6.2.1

RUN mkdir -p $JDBC_DRIVER_PATH && \
    echo "Downloading JDBC Driver for ClickHouse v$clickHouseVersion" && \
    wget -O ${JDBC_DRIVER_PATH}/${JDBC_DRIVER} https://github.com/ClickHouse/clickhouse-jdbc/releases/download/v$clickHouseVersion/${JDBC_DRIVER}


RUN wget -O ${HTTP_COMP_PATH}/httpcomponents-client-4.5.13-bin.tar.gz https://dlcdn.apache.org//httpcomponents/httpclient/binary/httpcomponents-client-4.5.13-bin.tar.gz
RUN tar -zxvf ${HTTP_COMP_PATH}/httpcomponents-client-4.5.13-bin.tar.gz -C ${HTTP_COMP_PATH}
jelarum
  • 11
  • 2