0

Here's run.py

from app import create_app

if __name__ == '__main__':
    app = create_app()
    app.run(host='0.0.0.0', port=8000, debug=True)

Here's the /app/app/__init__.py

from flask import Flask
from flask_cors import CORS
from app.config import Config
from threading import Thread
from app.consumer.kafka_consumer import consume_messages

def create_app():
    app = Flask(__name__)
    app.config.from_object(Config)
    app.config['SQLALCHEMY_DATABASE_URI'] = Config.MYSQL_DATABASE_URI
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

    CORS(app)

    consumer_thread = Thread(target=consume_messages)
    consumer_thread.start()
    
    # Register blueprints
    from app.controller.segmentation_controller import segmentation_bp
    app.register_blueprint(segmentation_bp)

    return app

Here's the /app/app/consumer/kafka_consumer.py

from kafka import KafkaConsumer
from app.config import Config
from app.service.segmentation_service import SegmentationService


def consume_messages():
    segmentation_service = SegmentationService()
    consumer = KafkaConsumer(
        Config.KAFKA_TOPIC,
        bootstrap_servers=['localhost:9092'],
        auto_offset_reset='latest',  # Start reading from the latest available offset
        enable_auto_commit=True,
        group_id='my-group',
        value_deserializer=lambda x: x.decode('utf-8'),
    )
    for message in consumer:
        segmentation_service.process_messages(message)

Here's the error message

2023-06-14 11:44:43 Traceback (most recent call last):
2023-06-14 11:44:43   File "run.py", line 1, in <module>
2023-06-14 11:44:43     from app import create_app
2023-06-14 11:44:43   File "/app/app/__init__.py", line 5, in <module>
2023-06-14 11:44:43     from app.consumer.kafka_consumer import consume_messages
2023-06-14 11:44:43   File "/app/app/consumer/kafka_consumer.py", line 1, in <module>
2023-06-14 11:44:43     from kafka import KafkaConsumer
2023-06-14 11:44:43   File "/usr/local/lib/python3.8/site-packages/kafka/__init__.py", line 23, in <module>
2023-06-14 11:44:43     from kafka.producer import KafkaProducer
2023-06-14 11:44:43   File "/usr/local/lib/python3.8/site-packages/kafka/producer/__init__.py", line 4, in <module>
2023-06-14 11:44:43     from .simple import SimpleProducer
2023-06-14 11:44:43   File "/usr/local/lib/python3.8/site-packages/kafka/producer/simple.py", line 54
2023-06-14 11:44:43     return '<SimpleProducer batch=%s>' % self.async
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Nabih Bawazir
  • 6,381
  • 7
  • 37
  • 70

1 Answers1

1

I resolved this error by uninstalling and installing kafka-python pip uninstall kafka pip install -U kafka-python

User
  • 46
  • 1
  • 2
  • 11