0

I created kafka cluster with 3 brokers 1 zookeper 1 kafka-connect with docker compose and docker below is the setup I did


networks:
  default:
    name: kafka-connect-with-three-brokers

services:
  zookeeper:
    container_name: zookeeper
    image: confluentinc/cp-zookeeper:5.4.2
    hostname: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    restart: unless-stopped

  broker1:
    container_name: broker1
    image: confluentinc/cp-kafka:5.4.2
    hostname: broker1
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://broker1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_NUM_PARTITIONS: 4
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
    restart: always

  broker2:
    container_name: broker2
    image: confluentinc/cp-kafka:5.4.2
    hostname: broker2
    depends_on:
      - zookeeper
    ports:
      - "9093:9093"
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://broker2:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_NUM_PARTITIONS: 4
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
    restart: always

  broker3:
    container_name: broker3
    image: confluentinc/cp-kafka:5.4.2
    hostname: broker3
    depends_on:
      - zookeeper
    ports:
      - "9094:9094"
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://broker3:19094,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_NUM_PARTITIONS: 4
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
    restart: always

  schema-registry:
    container_name: schema-registry
    image: confluentinc/cp-schema-registry:5.4.2
    hostname: schema-registry
    depends_on:
      - zookeeper
      - broker1
      - broker2
      - broker3
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://broker1:19092
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
    restart: always

  kafka-connect:
    image: kafka-connect-jdbc
    container_name: kafka-connect
    depends_on:
      - zookeeper
      - broker1
      - broker2
      - broker3
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
      CONNECT_BOOTSTRAP_SERVERS: "broker1:9092"
      CONNECT_REST_PORT: http://kafka-connect:8083
      CONNECT_GROUP_ID: kafka-connect
      CONNECT_CONFIG_STORAGE_TOPIC: kafka-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: kafka-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: kafka-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR

And am using the below dockerfile to get required plugins

FROM confluentinc/cp-kafka-connect-base:5.0.0
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:5.5.0

I created an image kafka-connect-jdbc from this dockerfile and used in kafka-connect service, but when I am starting the kafka-connect service using docker-compose up -d, it seems brokers are not available to connect with kafka-service, I tried to see the kafka-connect service logs using docker-compose logs kafka-connect I could see the below error in logs

kafka-connect      | [kafka-admin-client-thread | adminclient-1] WARN org.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-1] Connection to node -1 could not be established. Broker may not be available.
kafka-connect      | [kafka-admin-client-thread | adminclient-1] WARN org.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-1] Connection to node -1 could not be established. Broker may not be available.
kafka-connect      | [main] ERROR io.confluent.admin.utils.ClusterStatus - Error while getting broker list.
kafka-connect      | java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
kafka-connect      |    at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
kafka-connect      |    at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
kafka-connect      |    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
kafka-connect      |    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
kafka-connect      |    at io.confluent.admin.utils.ClusterStatus.isKafkaReady(ClusterStatus.java:149)
kafka-connect      |    at io.confluent.admin.utils.cli.KafkaReadyCommand.main(KafkaReadyCommand.java:150)
kafka-connect      | Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
kafka-connect      | [kafka-admin-client-thread | adminclient-1] WARN org.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-1] Connection to node -1 could not be established. Broker may not be available.
kafka-connect      | [main] INFO io.confluent.admin.utils.ClusterStatus - Expected 1 brokers but found only 0. Trying to query Kafka for metadata again ...
kafka-connect      | [main] ERROR io.confluent.admin.utils.ClusterStatus - Expected 1 brokers but found only 0. Brokers found [].```
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
ABC
  • 4,263
  • 10
  • 45
  • 72
  • 1
    From inside of the zookeeper container, are brokers reachable at the intended port? try `telnet [broker ip] [broker port]` – Rm4n Jun 30 '22 at 08:40
  • Look at `SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS`. Look what port that uses. Now, look at `CONNECT_BOOTSTRAP_SERVERS` and see what port it uses. The schema registry is correct. Connect is not. Also, you really do **not** need multiple brokers or Zookeepers. Also unrelated, but `cp-kafka-connect-base:5.4.2` would be a better base image since it is the same version as the other confluent containers – OneCricketeer Jun 30 '22 at 19:10

0 Answers0