When trying to let an AIOKafkaConsumer
start reading messages from a specific offset starting_offset
, how do we know which partition to be used?
I am trying to use the AIOKafkaConsumer.seek
method, but it requires a TopicPartition to be specified in.
import asyncio
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
async def main():
topic = "test"
starting_offset = 3
# Publish some messages
producer = AIOKafkaProducer(bootstrap_servers="localhost:29092")
await producer.start()
for i in range(10):
await producer.send_and_wait(topic, bytes(f"hello {i}", "utf-8"))
# Start consuming from a specific offset
consumer = AIOKafkaConsumer(topic, bootstrap_servers="localhost:29092")
await consumer.start()
consumer.seek(None, starting_offset)
while True:
message = await consumer.getone()
print("message:", message.value)
if __name__ == "__main__":
asyncio.run(main())