5

I'm trying to build a Flask app that has Kafka as an interface. I used a Python connector, kafka-python and a Docker image for Kafka, spotify/kafkaproxy .

Below is the docker-compose file.

version: '3.3'
services:
  kafka:
    image: spotify/kafkaproxy
    container_name: kafka_dev
    ports:
      - '9092:9092'
      - '2181:2181'
    environment:
      - ADVERTISED_HOST=0.0.0.0
      - ADVERTISED_PORT=9092
      - CONSUMER_THREADS=1
      - TOPICS=PROFILE_CREATED,IMG_RATED
      - ZK_CONNECT=kafka7zookeeper:2181/root/path
  flaskapp:
    build: ./flask-app
    container_name: flask_dev
    ports:
      - '9000:5000'
    volumes:
      - ./flask-app:/app
    depends_on:
      - kafka

Below is the Python snippet I used to connect to kafka. Here, I used the Kafka container's alias kafka to connect, as Docker would take care of mapping the alias to it's IP address.

from kafka import KafkaConsumer, KafkaProducer

TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:9092']

consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)

I got NoBrokersAvailable error. From this, I could understand that the Flask app could not find the Kafka server.

Traceback (most recent call last):
  File "./app.py", line 11, in <module>
    consumer = KafkaConsumer("PROFILE_CREATED", bootstrap_servers=BOOTSTRAP_SERVERS)
  File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py", line 340, in __init__
    self._client = KafkaClient(metrics=self._metrics, **self.config)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 219, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 819, in check_version
    raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable

Other Observations:

  1. I was able to run ping kafka from the Flask container and get packets from the Kafka container.
  2. When I run the Flask app locally, trying to connect to the Kafka container by setting BOOTSTRAP_SERVERS = ['localhost:9092'], it works fine.
Shashank
  • 435
  • 5
  • 13
  • for one possible solution see my answer here https://stackoverflow.com/a/50525419/3224238 if connecting from outside the `docker-compose` is not required you can just add the `hostname: kafka` in the kafka service definition and use it in the `ADVERTISED_HOST` as well, your flaskapp should be configured to connect to `kafka:9292` – Paizo Nov 27 '18 at 10:52
  • Suggestion: Edit your Python code to pull the bootstrap servers from an environment variable – OneCricketeer Nov 27 '18 at 18:39

2 Answers2

1

UPDATE

As mentioned by cricket_007, given that you are using the docker-compose provided below, you should use kafka:29092 to connect to Kafka from another container. So your code would look like this:

from kafka import KafkaConsumer, KafkaProducer

TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:29092']

consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)

END UPDATE

I would recommend you use the Kafka images from Confluent Inc, they have all sorts of example setups using docker-compose that are ready to use and they are always updating them.

Try this out:

---
version: '2'
services:
zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
    ZOOKEEPER_CLIENT_PORT: 2181
    ZOOKEEPER_TICK_TIME: 2000

kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
    - zookeeper
    ports:
    - 9092:9092
    environment:
    KAFKA_BROKER_ID: 1
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
    KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

flaskapp:
    build: ./flask-app
    container_name: flask_dev
    ports:
    - '9000:5000'
    volumes:
    - ./flask-app:/app

I used this docker-compose.yml and added your service on top Please note that:

The config used here exposes port 9092 for external connections to the broker i.e. those from outside the docker network. This could be from the host machine running docker, or maybe further afield if you've got a more complicated setup. If the latter is true, you will need to change the value 'localhost' in KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those remote clients

Make sure you check out the other examples, may be useful for you especially when moving to production environments: https://github.com/confluentinc/cp-docker-images/tree/5.0.1-post/examples

Also worth checking:

It seems that you need to specify the api_version to avoid this error. For more details check here.

Version 1.3.5 of this library (which is latest on pypy) only lists certain API versions 0.8.0 to 0.10.1. So unless you explicitly specify api_version to be (0, 10, 1) the client library's attempt to discover the version will cause a NoBrokersAvailable error.

producer = KafkaProducer(
    bootstrap_servers=URL,
    client_id=CLIENT_ID,
    value_serializer=JsonSerializer.serialize,
    api_version=(0, 10, 1)
)

This should work, interestingly enough setting the api_version is accidentally fixing the issue according to this:

When you set api_version the client will not attempt to probe brokers for version information. So it is the probe operation that is failing. One large difference between the version probe connections and the general connections is that the former only attempts to connect on a single interface per connection (per broker), where as the latter -- general operation -- will cycle through all interfaces continually until a connection succeeds. #1411 fixes this by switching the version probe logic to attempt a connection on all found interfaces.

The actual issue is described here

lloiacono
  • 4,714
  • 2
  • 30
  • 46
  • Thank you for the prompt response, @lloiacono. Unfortunately, even with confluentinc/cp-kafka, the issue still prevails. – Shashank Nov 27 '18 at 10:18
  • @Shashank are you getting the same error? Have you changed the Python snippet you posted above? Could you go inside your flask container and execute this: `nc -vz kafka 9092` Note that you might need to install netcat. – lloiacono Nov 27 '18 at 10:23
  • No, I didn't change the Python snippet. After running the command, `nc -vz kafka 1-10000` from inside the Flask container, I got the following output. `kafka [192.168.80.2] 2181 open kafka [192.168.80.2] 9092 open`. – Shashank Nov 27 '18 at 11:11
  • 1
    @Shashank Within a Flask container, you need to use `kafka:29092` with the above Compose file for Kafka – OneCricketeer Nov 27 '18 at 18:37
  • 1
    When I tried `api_version=(0, 10, 1)` for both producer and consumer, the issue shifted to `KafkaTimeoutError: Failed to update metadata after 60.0 secs.` I made sure to set `BOOTSTRAP_SERVERS = ['kafka:29092']` and tried `PLAINTEXT_HOST://localhost:9092` in a different iteration. Then I thought the problem could be because of not mentioning **TOPICS** explicitly. I referred to the [docs](https://docs.confluent.io/current/installation/docker/docs/config-reference.html#optional-confluent-enterprise-replicator-executable-settings) and added `WHITELIST` with topics, but couldn't make it work. – Shashank Nov 29 '18 at 05:41
0

I managed to get this up-and-running using a network named stream_net between all services.

# for local development
version: "3.7"
services:

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - stream_net

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    networks:
      - stream_net

  flaskapp:
    build: ./flask-app
    container_name: flask_dev
    ports:
      - "9000:5000"
    volumes:
      - ./flask-app:/app
    networks:
      - stream_net
    depends_on:
      - kafka

networks:
  stream_net:
  • connection from outside the containers on localhost:9092
  • connection within the network on kafka:29092

of course it is strange to put all containers that are already running within a network within a network. But in this way the containers can be named by their actual name. Maybe someone can explain exactly how this works, or it helps someone else to understand the core of the problem and to solve it properly.

Joost Döbken
  • 3,450
  • 2
  • 35
  • 79