0

I need to be able to consume all messages from a topic, from the beginning. Essentially identical to this StackOverflow query, but updated for Kafka 0.9. (There seem to be relatively few 0.9-specific StackOverflow answers).

Kafka High Level Consumer Fetch All Messages From Topic Using Java API (Equivalent to --from-beginning)

0.9 has a sufficiently different API that I don't really know where to start. I can do this from the command-line using a provided bash script, but don't know how to move forward.

Could you please provide me the appropriate methods or a small sample script to get me started? Thank you!

Community
  • 1
  • 1

1 Answers1

2

You need to set auto.offset.reset to earliest. See https://kafka.apache.org/documentation.html#newconsumerconfigs

See also https://github.com/apache/kafka/blob/0.9.0/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L179

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • Thank you! Is there any way to do this without modifying an external properties file? A solution I could use within my code would be ideal. –  May 03 '16 at 21:09
  • 1
    You can set it in `Properties` -- same as in v0.8.x. – Matthias J. Sax May 03 '16 at 21:20
  • I added `consumerProps.setProperty("auto.offset.reset", "earliest")` to my properties set-up code, then used it to a create a new KafkaConsumer. Behavior seems the same as earlier. Is there any further information I could give you that might make it easier for you to help me? –  May 03 '16 at 23:01
  • I then use the consumer I create as follows `ConsumerRecords records = consumer.poll(timeout); if (records.isEmpty()) System.out.println("EMPTY!"); for (ConsumerRecord record: records) System.out.println(record.value()); consumer.close();` –  May 03 '16 at 23:06
  • Could the new property be conflicting with these properties I set? `enable.auto.commit=true receive.buffer.bytes=262144 value.deserializer=org.apache.kafka.common.serialization... group.id=test auto.offset.reset=earliest session.timeout.ms=10000 bootstrap.servers=localhost:9092 max.partition.fetch.bytes=2097152 key.deserializer=org.apache.kafka.common.serialization... fetch.min.bytes=50000 ` –  May 03 '16 at 23:11
  • 1
    `Properties` overwrite values from config files. There should be no conflict. Using `KafkaConsumer` with `earliest` you cannot read data before the last committed offset of the same "group". For first start of your consumer, you should get all messages as no commit happened so far. Disabling auto-commit might also help. Using `SimpleConsumer` instead of `KafkaConsumer` is also an option. Not sure what behavior you exactly what to get? Please describe observed and expected behavior. – Matthias J. Sax May 04 '16 at 11:00
  • I'm not sure I want to use SimpleConsumer, since I'm on Kafka 0.9. I've detailed current behavior in another question. I expect each new consumer to read the topic from the beginning, but they seem to stop at the last unconsumed offset. I'd appreciate your help on the other question too. http://stackoverflow.com/questions/37016032/apache-kafka-0-9-java-api-kafkaconsumer-not-acting-as-expected –  May 04 '16 at 15:27
  • 1
    If the consumer belong to the same group, they read from the last committed offset. You would need to change the group id to read from beginning (or for single consumers -- ie, not parallel) omit the group-id. – Matthias J. Sax May 04 '16 at 15:37
  • Giving each consumer a randomized group ID did the trick! Thank you! :) –  May 04 '16 at 17:02
  • Maybe it might be better to use `.seekToBeginning()` :) – Matthias J. Sax Mar 31 '17 at 17:28