0

Hi I am using AioKafka consumer to read message published by another process. The other process has just published one message and my consumer code is reading same message infinitely. I tried to use manual commit but in vain. I am using library: https://pypi.org/project/aiokafka-commit/ I want to read each message only once as and when available.

from aiokafka import AIOKafkaConsumer
import asyncio
        
async def consume():
    consumer = AIOKafkaConsumer('websocket_chat_kafka',
                                bootstrap_servers = "127.0.0.1:9092", 
                                group_id = 'gid',
                                client_id = 'cid',
                                # enable_auto_commit = True,
                                # auto_commit_interval_ms=1000,
                                auto_offset_reset="latest")

    # await consumer.start()
    # while True:
    #     msg = await consumer.getone()
    #     print(msg)
    #     await consumer.commit()

    await consumer.start()
    # await consumer.seek_to_committed()
    
    async for msg in consumer:
        print(msg)
        await consumer.commit()

    await consumer.stop()

asyncio.run(consume())

Any ideas what am I doing wrong?

Thank you.

0 Answers0