how can we handle offset corruption ?
I want to save the offset logs somewhere else or maybe take snapshots of the offsets. How can I do this ?
how can we handle offset corruption ?
I want to save the offset logs somewhere else or maybe take snapshots of the offsets. How can I do this ?
Kafka stores offsets in topic named _consumer_offsets. Consumers commit the offsets into this topic and the value of auto.offset.reset (earliest/latest/none) determines the strategy to start reading the messages from partition. offset log retention is specified by broker properties.
auto.offset.reset = latest
=> will start reading messages from last committed offset, if not found then it will wait for new messages to arrive and start from there. No exception thrown
auto.offset.reset = earliest
=> Again it will not throw any exception and it will start reading messages from the beginning if offsets exists.
auto.offset.reset = none
=> It will throw an exception, when offset are not found.
You can use assign and seek to fetch specific data
//assign - set topic and partition you you want to read from using TopicPartion
TopicPartition topicPartitionToReadFrom = new
TopicPartition(topic, 0);
long offsetToReadFrom = 15L;
consumer.assign(Arrays.asList(topicPartitionToReadFrom));
//seek - set position of the consumer manually by calling
//KafkaConsumer.seek(TopicPartition partition, long offset)
consumer.seek(topicPartitionToReadFrom, offsetToReadFrom);
To store the offset log => _consumer_offsets is topic so you can write a consumer for this topic and store the messages into storage of your choice.