Please can anyone tell me how to read messages using the Kafka Consumer API from the beginning every time when I run the consumer.
11 Answers
This works with the 0.9.x consumer. Basically when you create a consumer, you need to assign a consumer group id to this consumer using the property ConsumerConfig.GROUP_ID_CONFIG
. Generate the consumer group id randomly every time you start the consumer doing something like this properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
(properties is an instance of java.util.Properties that you will pass to the constructor new KafkaConsumer(properties)
).
Generating the client randomly means that the new consumer group doesn't have any offset associated to it in kafka. So what we have to do after this is to set a policy for this scenario. As the documentation for the auto.offset.reset
property says:
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
- earliest: automatically reset the offset to the earliest offset
- latest: automatically reset the offset to the latest offset
- none: throw exception to the consumer if no previous offset is found or the consumer's group
- anything else: throw exception to the consumer.
So from the options above listed we need to choose the earliest
policy so the new consumer group starts from the beginning every time.
Your code in java, will look something like this:
properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "your_client_id");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumer = new KafkaConsumer(properties);
The only thing that you need to figure it out now, is when having multiple consumers that belong to the same consumer group but are distributed how to generate a random id and distribute it between those instances so they all belong to the same consumer group.
Hope it helps!

- 2,236
- 2
- 17
- 33
-
11This is not a good solution. Doing this will cause zookeeper data to pile up with new entries being constantly created and abandoned. It's better to delete the entry for your group as indicated by KingJulien below and the linked answer he posted. – CamW Oct 27 '16 at 06:50
-
You need to close the consumer. consumer.close(); – Nautilus Oct 27 '16 at 17:35
One option to do this would be to have a unique group id each time you start which will mean that Kafka would send you the messages in the topic from the beginning. Do something like this when you set your properties for KafkaConsumer
:
properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
The other option is to use consumer.seekToBeginning(consumer.assignment())
but this will not work unless Kafka first gets a heartbeat from your consumer by making the consumer call the poll method. So call poll()
, then do a seekToBeginning()
and then again call poll()
if you want all the records from the start. It's a little hackey but this seems to be the most reliable way to do it as of the 0.9 release.
// At this point, there is no heartbeat from consumer so seekToBeinning() wont work
// So call poll()
consumer.poll(0);
// Now there is heartbeat and consumer is "alive"
consumer.seekToBeginning(consumer.assignment());
// Now consume
ConsumerRecords<String, String> records = consumer.poll(0);

- 1,044
- 11
- 28

- 7,378
- 1
- 27
- 32
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
If you simply avoid saving any offsets the consumer will always reset at the beginning.
In order for this to work, you can never use the commit API (that's what I mean by avoid saving any offsets). Disabling auto commits does not count as a usage of the commit API.
I've done this a lot and it works for me, especially when developing and testing. For prod I prefer skm's answer https://stackoverflow.com/a/47530912/1213475 since I always want commits in prod, it's a simple way to monitor the consumption and the consumer lag.
-
As repeatedly documented, this is not sufficient: offset reset commit is only used if no commits are found, but since setting auto commit to false implies that you are using a commit API, commits will still happen when messages are read. https://docs.confluent.io/platform/current/clients/consumer.html – sigma1510 Feb 01 '22 at 09:25
-
"commits will still happen"? Why would it? If it's repeatedly documented, please provide a source (what you claim is not in the link you provided). – offroff Apr 26 '22 at 06:59
-
"The consumer therefore supports a commit API which gives you full control over offsets. Note that when you use the commit API directly, you should first disable auto-commit in the configuration by setting the enable.auto.commit property to false. Each call to the commit API results in an offset commit request being sent to the broker." - https://docs.confluent.io/platform/current/clients/consumer.html – sigma1510 Apr 26 '22 at 13:01
-
"The two main settings affecting offset management are whether auto-commit is enabled and the offset reset policy. First, if you set enable.auto.commit (which is the default), then the consumer will automatically commit offsets periodically at the interval set by auto.commit.interval.ms. The default is 5 seconds. Second, use auto.offset.reset to define the behavior of the consumer when there is no committed position (which would be the case when the group is first initialized) or when an offset is out of range" --https://docs.confluent.io/platform/current/clients/consumer.html#id1 – sigma1510 Apr 26 '22 at 13:09
-
TL;DR: auto-commit == true -> the consumer commits at intervals of x seconds. auto.commit == false -> the consumer commits when there is a call to the commit API. Commits still happen. The offset is still updated. The queue does not return to the beginning. – sigma1510 Apr 26 '22 at 13:11
-
What I suggest is to never use the commit API (that's what I mean by avoid saving any offsets)(disabling auto commits does not count as a usage of the commit API). I've done this a lot and it works, I especially do it when developing and testing. For prod I prefer skm:s answer https://stackoverflow.com/a/47530912/1213475 since I always want commits in prod, it's such a simple way to monitor the consumption and the consumer lag. – offroff Apr 28 '22 at 07:52
-
In that case you need to update your answer to include that information. As it stands, your answer is insufficient. See stackoverflow guidelines: https://stackoverflow.com/help/how-to-answer and https://stackoverflow.com/conduct – sigma1510 Apr 28 '22 at 09:49
-
Ok, I don't agree with your interpretation of my answer, I think it's a simple and correct answer. Turn off auto commits and don't save any other commits as well. It's really simple. – offroff Apr 29 '22 at 09:57
One possible solution is to use an implementation of ConsumerRebalanceListener while subscribing to one or more topics. The ConsumerRebalanceListener contains callback methods when new partitions are assigned or removed from a consumer. The following code sample illustrates this :
public class SkillsConsumer {
private String topic;
private KafkaConsumer<String, String> consumer;
private static final int POLL_TIMEOUT = 5000;
public SkillsConsumer(String topic) {
this.topic = topic;
Properties properties = ConsumerUtil.getConsumerProperties();
properties.put("group.id", "consumer-skills");
this.consumer = new KafkaConsumer<>(properties);
this.consumer.subscribe(Collections.singletonList(this.topic),
new PartitionOffsetAssignerListener(this.consumer));
}
}
public class PartitionOffsetAssignerListener implements ConsumerRebalanceListener {
private KafkaConsumer consumer;
public PartitionOffsetAssignerListener(KafkaConsumer kafkaConsumer) {
this.consumer = kafkaConsumer;
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
//reading all partitions from the beginning
for(TopicPartition partition : partitions)
consumer.seekToBeginning(partition);
}
}
Now whenever the partitions are assigned to the consumer, each partition will be read from the beginning.
-
1This is the cleanest solution IMHO. As a tiny improvement: the `for`-loop inside the `onPartitionsAssigned` method can be replaced by `consumer.seekToBeginning(partitions)` (works for kafka-2.1.0 in my tests). – Yi Ou Mar 17 '19 at 18:20
1) https://stackoverflow.com/a/17084401/3821653
To reset the consumer group, you can delete the Zookeeper group id
import kafka.utils.ZkUtils;
ZkUtils.maybeDeletePath(<zkhost:zkport>, </consumers/group.id>);`

- 179,855
- 19
- 132
- 245

- 303
- 3
- 9
So for me what worked was a combination of what has been suggested above. The key change was to include
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
and have a randomly generated GROUP ID each time. But this alone didn't work for me. For some reason the first time I polled the consumer it never got any records. I had to hack it to get it to work -
consumer.poll(0); // without this the below statement never got any records
final ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofMillis(100));
I'm new to KAFKA and have no idea why this is happening, but for anyone else still trying to get this to work, hope this helps.

- 842
- 8
- 12
-
I had to resort to this approach too in order to get all the data each time I wanted get all data from the topic. – mikemil Jan 29 '19 at 15:58
This is my code to read messages from the beginning (using Java 11)
try (var consumer = new KafkaConsumer<String, String>(config)) {
consumer.subscribe(Set.of(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
consumer.seekToBeginning(partitions);
}
});
// polling messages
}
You can see full code example here:
https://gist.github.com/vndung/4c9527b3aeafec5d3245c7a3b921f8b1

- 289
- 4
- 7
while using the High Level consumer set props.put("auto.offset.reset", "smallest");
in times of creating the ConsumerConfig

- 8,015
- 5
- 48
- 60
-
13This will ONLY make sure when you are reading FIRST TIME it will read from the beginning. Subsequent reads will totally ignore this setting and read from the last offset. – KingJulien Mar 05 '16 at 16:44
-
4
If you are using the java consumer api more specifically org.apache.kafka.clients.consumer.Consumer, You can try the seek* methods.
consumer.seekToBeginning(consumer.assignment())
Here, consumer.assignment() returns all the partitions assigned to a given consumer and seekToBeginning will start from the earliest offset for the given collection of partitions.

- 94
- 6
-
1this worked for me, but i needed to poll until there was actually an assignment: while(consumer.assignment().isEmpty()) { consumer.poll(Duration.ofMillis(0)); }. once that loop finished, I polled normally and received all records from the beginning – Ant Kutschera May 03 '20 at 11:58
-
Thanks for the above comment (regarding waiting for the assignment) - now works perfectly. – Dan Sep 01 '20 at 12:41
To always read from offset 0 without creating new groupId everytime.
// ... Assuming the props have been set properly.
// ... enable.auto.commit and auto.offset.reset as default
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));
consumer.poll(0); // without this, the assignment will be empty.
consumer.assignment().forEach(t -> {
System.out.printf("Set %s to offset 0%n", t.toString());
consumer.seek(t, 0);
});
while (true) {
// ... consumer polls messages as usual.
}

- 95
- 1
- 6
Another option is to leave your Consumer code simple and steer the offset management from outside using the command line tool kafka-consumer-groups
that comes with Kafka.
Each time, before starting the consumer, you would call
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--execute --reset-offsets \
--group myConsumerGroup \
--topic myTopic \
--to-earliest
Depending on your requirement you can reset the offsets for each partition of the topic with that tool. The help function or documentation explain the options:
--reset-offsets also has following scenarios to choose from (atleast one scenario must be selected):
--to-datetime <String: datetime> : Reset offsets to offsets from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest : Reset offsets to earliest offset.
--to-latest : Reset offsets to latest offset.
--shift-by <Long: number-of-offsets> : Reset offsets shifting current offset by 'n', where 'n' can be positive or negative.
--from-file : Reset offsets to values defined in CSV file.
--to-current : Resets offsets to current offset.
--by-duration <String: duration> : Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'
--to-offset : Reset offsets to a specific offset.

- 16,250
- 3
- 42
- 77