3

Introduction

I am currently experimenting with Kafka and FastAPI and trying to build a template to enable me to quickly write programs in a microservice pattern.

Goal - Vision

Building a repository of design patterns that implement very easy microservice infrastructures. The examples should only demonstrate how messages are sent between different services and offer a user to easily integrate their custom code without the hassle of spending a lot of time with the setup.

Motivation

I searched a lot but I was not able to find simple examples. Most examples are highly customized and do not really generalize.

Tech Stack

  • Kafka
  • FastApi
  • Docker

Open to other implementations

Please let me know if you have any other recommendations. I am quite new to microservice architectures and would be very happy to explore further designs.

Current Problem

My current template involves building a Zookeeper, Kafka, consumer, and producer service. However, I am encountering an issue where my consumer is not able to consume messages generated by my producer. The producer seems to work fine and successfully publishes messages, which I have confirmed using the docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic my-topic --from-beginning command.

My consumer appears to not do anything at all.

Thank you in advance for all your suggestions on this issue.

my folder structure:

enter image description here

my docker-compose file:

version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181
      - 2888:2888
      - 3888:3888

  kafka:
    image: confluentinc/cp-kafka:latest
    restart: "no"
    links:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: INTERNAL://:29092,EXTERNAL://:9092
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL


  producer:
    build: ./producer
    ports:
      - '8000:8000'
    environment:
      - KAFKA_BOOTSTRAP_SERVERS=kafka:29092
    depends_on:
      - kafka

  consumer:
    build: ./consumer
    environment:
      - KAFKA_BOOTSTRAP_SERVERS=kafka:29092
      - KAFKA_GROUP_ID=my-group
    depends_on:
      - kafka

  kafdrop:
    image: obsidiandynamics/kafdrop
    restart: "no"
    environment:
      - KAFKA_BOOTSTRAP_SERVERS=kafka:29092
    ports:
      - 9000:9000
    depends_on:
      - kafka

my producer docker file:

FROM python:3.8-slim-buster


COPY . /app
WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

my producer req file:

fastapi
uvicorn
confluent-kafka

my producer main.py:

import json
from confluent_kafka import Producer
from fastapi import FastAPI


app = FastAPI()

producer_conf = {
    'bootstrap.servers': 'kafka:9092',
    'client.id': 'my-app'
}

producer = Producer(producer_conf)

def produce(data: dict):
    try:
        data = json.dumps(data).encode('utf-8')
        producer.produce('my-topic', value=data)
        producer.flush()
        return {"status": "success", "message": data}
    except Exception as e:
        return {"status": "error", "message": str(e)}

my consumer docker file:

FROM python:3.8-slim-buster

COPY . /app
WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt



CMD [ "python", "main.py" ]

my consumer req file:

confluent-kafka

my consumer main.py:

from confluent_kafka import Consumer, KafkaError

conf = {
    'bootstrap.servers': 'kafka:9092',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': True,
    'group.id': 'my-group',
    'api.version.request': True,
    'api.version.fallback.ms': 0
}

def consume_messages():
    consumer = Consumer(conf)

    consumer.subscribe(['my-topic'])

    try:
        while True:
            msg = consumer.poll(1.0)

            if msg is None:
                continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    print(f'Reached end of partition: {msg.topic()}[{msg.partition()}]')
                else:
                    print(f'Error while consuming messages: {msg.error()}')
            else:
                print(f"Received message: {msg.value().decode('utf-8')}")

    except Exception as e:
        print(f"Exception occurred while consuming messages: {e}")
    finally:
        consumer.close()

def startup():
    consume_messages()

if __name__ == "__main__":
    try:
        print("Starting consumer...")
        startup()
    except Exception as e:
        print(f"Exception occurred: {e}")

Build system via:

docker-compose up

You can activate the producer with this curl:

 curl -X POST http://localhost:8000/produce -H "Content-Type: application/json" -d '{"key": "nice nice nice"}'

I tried to re-write the consumer multiple times. Changed ports and docker compose configurations. Unfortunatly, I am unable to pin-point my issue.

mm117
  • 81
  • 6
  • can you please share `requirements.txt` for producer and consumer and `docker-compose` commands you used to raise the stack? – rok Mar 25 '23 at 13:30
  • Hey @rok, I updated the post and included more detail. – mm117 Mar 25 '23 at 13:58
  • 1
    ok, thanks. Please note, that `kafka-python` is an old package not developed since 2020. In addition,`docker-compose up` fails to raise both producer and consumer... They exit with 1 error code with exception... – rok Mar 25 '23 at 14:01
  • @rok Interesting, I am able to compose the stack actually. Would you recommend confluent-kafka instead? – mm117 Mar 25 '23 at 14:06
  • yes, exactly, as it's supported by Confluent – rok Mar 25 '23 at 14:08
  • @rok, is confluent-kafka open source? I know that confluent is also offering a paid service. My goal is to develop templates that can be implemented by anyone without the need to create an account on some website etc. Do you have an idea of how to convert my code in the above? Many thanks for your input. – mm117 Mar 25 '23 at 14:11
  • Yes the Confluent Python library is open source, but that has nothing to do with the problem here. If you want to use asyncio, use aiokafka – OneCricketeer Mar 25 '23 at 18:13
  • @OneCricketeer, could asyncio cause this issue? Just switched the code to confluent-kafka. Will update the code in the above shortly. I am now getting an error from my consumer container saying: thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Connect to ipv4#172.19.0.3:9092 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed) – mm117 Mar 25 '23 at 18:56
  • 1) Kafka broker doesn't start immediately, so neither should your consumer 2) Please see https://stackoverflow.com/questions/51630260/connect-to-kafka-running-in-docker – OneCricketeer Mar 25 '23 at 22:45
  • @OneCricketeer, please excuse my basic questions on this but how do I delay the startup of my producer and consumer? Had a look at the link and played further around with my configs but still was not successful. I added Kafdrop to have some more visibility. It's quite odd to me that my producer appears to work as intended. Only the consumer causes issues...thanks again for all the help – mm117 Mar 27 '23 at 23:49
  • 1
    Producers buffer data and do not send immediately to the brokers, so don't need the server to be available immediately. The consumer, on the other hand need to query for offsets, which may not be available yet... In docker-compose, `depends_on` does not "wait". You need to add `time.sleep(10)` for example before you call your startup function – OneCricketeer Mar 28 '23 at 18:11
  • @OneCricketeer amazing! this finally worked. Thanks a lot for this. Will post the solution here and create a repo to collect and publish further designs + improvements. Would love to get further feedback from you on this :) – mm117 Mar 29 '23 at 23:02

1 Answers1

3

Special thanks to @OneCricketeer for helping me get this up and running!

Repository for micro service templates

(contains this solution)

Please feel free to contribute here:

https://github.com/maxmekiska/micro-templates

Tech Stack

  • Kafka
  • FastApi
  • Docker

Open to other implementations

Please let me know if you have any other recommendations. I am quite new to microservice architectures and would be very happy to explore further designs.

my folder structure:

enter image description here

my docker-compose file:

version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181
      - 2888:2888
      - 3888:3888

  kafka:
    image: confluentinc/cp-kafka:latest
    restart: "no"
    links:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: INTERNAL://:29092,EXTERNAL://:9092
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL

  kafdrop:
    image: obsidiandynamics/kafdrop
    restart: "no"
    environment:
      KAFKA_BROKERCONNECT: "kafka:29092"
    ports:
      - 9000:9000
    depends_on:
      - kafka

  producer:
    build: ./producer
    ports:
      - '8000:8000'
    environment:
      - KAFKA_BOOTSTRAP_SERVERS=kafka:29092
    depends_on:
      - kafka

  consumer:
    build: ./consumer
    environment:
      - KAFKA_BOOTSTRAP_SERVERS=kafka:29092
      - KAFKA_GROUP_ID=my-group
    depends_on:
      - kafka

my producer docker file:

FROM python:3.8-slim-buster


COPY . /app
WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

my producer req file:

fastapi
uvicorn
confluent-kafka

my producer main.py:

import json
from confluent_kafka import Producer
from fastapi import FastAPI


app = FastAPI()

producer_conf = {
    'bootstrap.servers': 'kafka:29092',
    'client.id': 'my-app'
}

producer = Producer(producer_conf)

@app.post("/produce")
def produce(data: dict):
    producer.produce('my-topic', value=json.dumps(data).encode('utf-8'))
    producer.flush()
    return {"status": "success"}

my consumer docker file:

FROM python:3.8-slim-buster

COPY . /app
WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt



CMD [ "python", "main.py" ]

my consumer req file:

confluent-kafka

my consumer main.py:

from confluent_kafka import Consumer, KafkaError
import time 

import logging
logging.basicConfig(level=logging.DEBUG)


conf = {
    'bootstrap.servers': 'kafka:29092',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': True,
    'group.id': 'my-group',
    'api.version.request': True,
    'api.version.fallback.ms': 0
}

def consume_messages():
    consumer = Consumer(conf)

    consumer.subscribe(['my-topic'])

    try:
        while True:
            msg = consumer.poll(1.0)
            logging.info("Polling")
            logging.info(msg)

            if msg is None:
                logging.info("No message")
                continue

            if msg.error():
                logging.info("Error")
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    print(f'Reached end of partition: {msg.topic()}[{msg.partition()}]')
                else:
                    print(f'Error while consuming messages: {msg.error()}')
                    logging.info(msg.error())
            else:
                print(f"Received message: {msg.value().decode('utf-8')}")
                logging.info(msg.value().decode('utf-8'))

    except Exception as e:
        print(f"Exception occurred while consuming messages: {e}")
        logging.info(e)
    finally:
        consumer.close()
        logging.info("Consumer closed")


def startup():
    logging.info("Starting consumer...")
    time.sleep(30)
    consume_messages()

if __name__ == "__main__":
    try:
        startup()
    except Exception as e:
        print(f"Exception occurred: {e}")

Build system via:

docker-compose up

You can activate the producer with this curl:

 curl -X POST http://localhost:8000/produce -H "Content-Type: application/json" -d '{"key": "nice nice nice"}'
mm117
  • 81
  • 6
  • 1
    You may want a try-except around the producer actions and not default to return `success` – OneCricketeer Mar 30 '23 at 17:21
  • @OneCricketeer good point, will update this accordingly – mm117 Mar 30 '23 at 18:13
  • 1
    @mm117 glad you made it. You can find similar implementation of [Kafka Producer and Consumer in Python](https://rokpoto.com/kafka-producer-consumer-in-python/) at my blog – rok Apr 01 '23 at 16:30