You need to call reset_offsets()
. For example:
consumer = topic.get_simple_consumer(consumer_group="example")
partition_offset_pairs = [(p, get_offset_for_partition(p)) for p in consumer.partitions.itervalues()]
# because we passed in a consumer_group the new offsets will be saved in Kafka
consumer.reset_offsets(partition_offsets=partition_offset_pairs)
(where get_offset_for_partition()
is a function you define). Or for a single-partition topic:
# read from offset 123456
consumer = topic.get_simple_consumer()
partition = topic.partitions[0]
consumer.reset_offsets([(partition, 123456)])
The same reset_offsets()
method is also available on BalancedConsumer
& ManagedBalanceConsumer
classes too.
Note that as part of Kafka's design, messages are only guaranteed in-order for each topic partition independently.