I'm just getting started with kafka, I have a k8s cluster in which I want to deploy event listeners. When I have one listener running, everything works fine, but with several pods they process events in parallel, I would like the event to be processed only once. How can I achieve this?
My listener code:
import asyncio, settings, json
from aiokafka import AIOKafkaConsumer
event_handler = {
"table_create": table_create_event,
"table_delete": table_delete_event,
}
consumer_config = [
{
"name": "main consumer1",
"topics": ["storage_create", "storage_update", "storage_delete",
"table_create", "table_update", "table_delete",
"field_create", "field_update", "field_delete",
"value_create", "value_update", "value_delete"],
"group_id": "cms_events"
},
{
"name": "main consume2r",
"topics": ["storage_create", "storage_update", "storage_delete",
"table_create", "table_update", "table_delete",
"field_create", "field_update", "field_delete",
"value_create", "value_update", "value_delete"],
"group_id": "cms_events"
}
]
async def consume(topics, group_id):
consumer = AIOKafkaConsumer(
*topics,
bootstrap_servers='localhost:9092',
group_id=group_id,
auto_offset_reset="earliest",
metadata_max_age_ms=30000,
)
await consumer.start()
try:
async for msg in consumer:
print(
"{}:{:d}:{:d}: key={} value={} timestamp_ms={}".format(
msg.topic, msg.partition, msg.offset, msg.key, msg.value,
msg.timestamp)
)
topic = msg.topic
encode_event_body = msg.value
decode_event_body = json.loads(encode_event_body)
try:
await event_handler[topic](decode_event_body)
except Exception as exc:
print(exc)
finally:
await consumer.stop()
async def main():
await asyncio.gather(*[
consume(topics=consumer.get("topics"), group_id=consumer.get("group_id"))
for consumer in consumer_config
]
)
if __name__ == "__main__":
asyncio.run(main())