0

When trying to let an AIOKafkaConsumer start reading messages from a specific offset starting_offset, how do we know which partition to be used?

I am trying to use the AIOKafkaConsumer.seek method, but it requires a TopicPartition to be specified in.

import asyncio
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer


async def main():
    topic = "test"
    starting_offset = 3
    
    # Publish some messages
    producer = AIOKafkaProducer(bootstrap_servers="localhost:29092")
    await producer.start()
    for i in range(10):
        await producer.send_and_wait(topic, bytes(f"hello {i}", "utf-8"))

    # Start consuming from a specific offset
    consumer = AIOKafkaConsumer(topic, bootstrap_servers="localhost:29092")
    await consumer.start()
    consumer.seek(None, starting_offset)

    while True:
        message = await consumer.getone()
        print("message:", message.value)


if __name__ == "__main__":
    asyncio.run(main())
Athena Wisdom
  • 6,101
  • 9
  • 36
  • 60

1 Answers1

1

Does your topic only have one partition? If so, then use 1... Otherwise, there is no straightforward answer to that.

Partitions have individual offset values. You could seek all partitions to the same offset, but this is not guaranteed to exist for all, and you would first need to loop over a range of the partition numbers (see partitions_for_topic(topic)) to seek them individually.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • If the `test` topic does not exist before running the above code, by default, does `AIOKafkaProducer` creates the new topic with only 1 partition? – Athena Wisdom Aug 05 '21 at 15:01
  • Getting the error `kafka.errors.IllegalStateError: IllegalStateError: No current assignment for partition 1` when using `consumer.seek(1, starting_offset)` – Athena Wisdom Aug 05 '21 at 15:03
  • If you have auto-topic creation enabled on the broker, the topic should get created, yes. And the default partition count for that topic would be defined in the `server.properties` file. I'm not sure if the partitions would be starting at `0` for that library, but you can try that as well – OneCricketeer Aug 05 '21 at 15:08
  • `consumer.partitions_for_topic(topic)` gives `{0}`, but if the next line is `consumer.seek(0, starting_offset)` then we get the error `kafka.errors.IllegalStateError: IllegalStateError: No current assignment for partition 0` – Athena Wisdom Aug 05 '21 at 15:26
  • You need to call assign() or subscribe() before you can seek – OneCricketeer Aug 05 '21 at 15:45
  • Tried running `consumer.assign([0])` but this gave the error `kafka.errors.IllegalStateError: IllegalStateError: Subscription to topics, partitions and pattern are mutually exclusive` – Athena Wisdom Aug 06 '21 at 04:08
  • Did you do that before, or after, `consumer.start()`? You'd need to seek before you start the consumer, and you need to assign/subscribe before that – OneCricketeer Aug 06 '21 at 04:18
  • 1
    Ohhh, I was starting doing `consumer.start()` before doing the assign/seek – Athena Wisdom Aug 10 '21 at 14:20