Say that I have a topic my-topic
and a group my-group
, and that my clients use the confluent-kafka
Python package. Consumers are configured with "auto.offset.reset": "earliest"
to ensure that all messages are processed at least once. Now say I accidentally add 10,000 malformed messages to my-topic
during early development. As a one-time administrative action, I want to seek my-group
's offsets to the end of each partition so that I never see those messages again. I don't care if extra messages slip into the topic in the mean time, and I end up skipping more messages than necessary. I also don't care about whether other groups see those malformed messages. Is this kind of "fast-forwarding" possible to do at the group level, maybe with AdminClient
? Or am I stuck with shutting down the existing consumers, writing a script to create a consumer in the my-group
group, consuming messages and commiting offsets until those messages are gone, closing that consumer, and rebooting my real consuming process?
Asked
Active
Viewed 1,400 times
3

Sunderam Dubey
- 1
- 11
- 20
- 40

QuintusTheFoul
- 31
- 3
1 Answers
2
You can refer to this example of setting the offset directly on the partitions: https://github.com/confluentinc/confluent-kafka-python/issues/145#issuecomment-284843254
For example:
import confluent_kafka
NEW_OFFSET = 666
if __name__ == '__main__':
c = confluent_kafka.Consumer({
"bootstrap.servers": "eden:9092",
"group.id":"my-group"
})
def my_assign (consumer, partitions):
for p in partitions:
p.offset = NEW_OFFSET
print('assign', partitions)
consumer.assign(partitions)
c.subscribe(["my-topic"], on_assign=my_assign)
while True:
m = c.poll(1)
if m is None:
continue
if m.error() is None:
print('Received message', m)
c.close()

Sergey Antopolskiy
- 3,970
- 2
- 24
- 40
-
Won't this change the offset as it polls? Is there a way to do this as a prep action for when the consumers turn on at a later time? – user4446237 Apr 04 '23 at 11:48