14

I would like to get all the messages from beginning in a topic from server.

Ex:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testTopic --from-beginning

When using the above console command, I would like to able to get all messages in a topic from the beginning but I couldn't consume all the messages in a topic from beginning using java code.

Manav Garg
  • 512
  • 1
  • 3
  • 17
gsc0441
  • 313
  • 1
  • 3
  • 10

4 Answers4

10

You can get all messages using the following command:

cd Users/kv/kafka/bin

./kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic topicName --from-beginning --max-messages 100
Mateusz Piotrowski
  • 8,029
  • 10
  • 53
  • 79
KayV
  • 12,987
  • 11
  • 98
  • 148
9

The easiest way would be to start a consumer and drain all the messages. Now I don't know how many partitions you have in your topic and whether you already have a an existing consumer group or not, but you have a few options:

Have a look at this API: https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

1) If you already have a consumer in the same consumer group, and still want to start consuming from the beginning, you should use the seek option listed in the API doc and set the offset to 0 for each consumer in the group. This would start consuming from the beginning.

2) Otherwise, you can start a few consumers in a new consumer group & you would not have to worry about seek.

PS: Please remember to provide more details about your setup in the future if you have more questions on Kafka. A lot of things depend on how you have configured your infrastructure & how you would prefer it to be and would thus vary from case to case.

Manav Garg
  • 512
  • 1
  • 3
  • 17
  • For a new consumer group, you also need to set `auto.offset.reset = earliest` – Matthias J. Sax Aug 13 '16 at 10:06
  • Also, it would be good if you can specify the Kafka Version explicitly in your documentation answer. This is because there is a lot of difference in how consumer offsets are handled. Earlier it was the job of zookeeper (before 0.9) but now is handled by kafka topic "__consumer_offset". I find many people getting confused because of things changing between versions. – Manav Garg Aug 13 '16 at 17:28
  • 1
    I agree -- actually, SO Documentation supports an explicit "version" field, but it was disabled for me when I wrote the stuff, so I could not include the version :( Maybe I can set it, after the topic as reviewed and accepted. – Matthias J. Sax Aug 13 '16 at 18:03
  • 1
    Thanks @Manav I got all the messages using below code as per your suggestion seek. TopicPartition topicPartition = new TopicPartition(topic, 0); List partitions = Arrays.asList(topicPartition); consumer.assign(partitions); consumer.seekToBeginning(partitions); – gsc0441 Aug 15 '16 at 21:36
8
TopicPartition topicPartition = new TopicPartition(topic, 0);
List<TopicPartition> partitions = Arrays.asList(topicPartition); 
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
Community
  • 1
  • 1
gsc0441
  • 313
  • 1
  • 3
  • 10
1

Just change the consumer group

ConsumerConfig.GROUP_ID_CONFIG - to new group id

and set

AUTO_OFFSET_RESET_CONFIG - earliest

sample code-

    props.put(ConsumerConfig.GROUP_ID_CONFIG, "newID");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");