Here's run.py
from app import create_app
if __name__ == '__main__':
app = create_app()
app.run(host='0.0.0.0', port=8000, debug=True)
Here's the /app/app/__init__.py
from flask import Flask
from flask_cors import CORS
from app.config import Config
from threading import Thread
from app.consumer.kafka_consumer import consume_messages
def create_app():
app = Flask(__name__)
app.config.from_object(Config)
app.config['SQLALCHEMY_DATABASE_URI'] = Config.MYSQL_DATABASE_URI
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
CORS(app)
consumer_thread = Thread(target=consume_messages)
consumer_thread.start()
# Register blueprints
from app.controller.segmentation_controller import segmentation_bp
app.register_blueprint(segmentation_bp)
return app
Here's the /app/app/consumer/kafka_consumer.py
from kafka import KafkaConsumer
from app.config import Config
from app.service.segmentation_service import SegmentationService
def consume_messages():
segmentation_service = SegmentationService()
consumer = KafkaConsumer(
Config.KAFKA_TOPIC,
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest', # Start reading from the latest available offset
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: x.decode('utf-8'),
)
for message in consumer:
segmentation_service.process_messages(message)
Here's the error message
2023-06-14 11:44:43 Traceback (most recent call last):
2023-06-14 11:44:43 File "run.py", line 1, in <module>
2023-06-14 11:44:43 from app import create_app
2023-06-14 11:44:43 File "/app/app/__init__.py", line 5, in <module>
2023-06-14 11:44:43 from app.consumer.kafka_consumer import consume_messages
2023-06-14 11:44:43 File "/app/app/consumer/kafka_consumer.py", line 1, in <module>
2023-06-14 11:44:43 from kafka import KafkaConsumer
2023-06-14 11:44:43 File "/usr/local/lib/python3.8/site-packages/kafka/__init__.py", line 23, in <module>
2023-06-14 11:44:43 from kafka.producer import KafkaProducer
2023-06-14 11:44:43 File "/usr/local/lib/python3.8/site-packages/kafka/producer/__init__.py", line 4, in <module>
2023-06-14 11:44:43 from .simple import SimpleProducer
2023-06-14 11:44:43 File "/usr/local/lib/python3.8/site-packages/kafka/producer/simple.py", line 54
2023-06-14 11:44:43 return '<SimpleProducer batch=%s>' % self.async