1

I have a topic named `test' in my Broker. I checked it with the CLI.

I created a java producer to send messages to the topic test. I can consume them from my CLI.

.\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning (I am running this on Windows)

However, when I run it in my Java Consumer program, it doesn't consume any messages even though I set the auto.offset.reset to earliest. What am I doing wrong?

public class Consumer1 {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "jin");
        props.put("enable.auto.commit", "true");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        //consumer.subscribe(Collections.singletonList("test"));
        consumer.subscribe(Arrays.asList("test"));

        try {
            while (true) {              
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                //ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records){
                    System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
                }
                //consumer.commitAsync();               
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            consumer.close();
            System.out.println("closed");
        }   
    }
}
Jin Lee
  • 3,194
  • 12
  • 46
  • 86
  • 1
    Some sow links - https://stackoverflow.com/a/57196346/9657108, https://stackoverflow.com/a/56822086/9657108, https://stackoverflow.com/a/56822200/9657108 – arunkvelu Jul 26 '19 at 02:55
  • 1
    have you tried by changing this group id `props.put("group.id", "jin");` – Ryuzaki L Jul 26 '19 at 03:59
  • @Deadpool wow. I changed my group id and it works!!! I don't know why though. Hey please put it as an answer. I'll accept it. – Jin Lee Jul 26 '19 at 04:23

1 Answers1

1

auto.offset.reset This property will if it is a brand new consumer group, or if consumer group offset is deleted. It will not work for consumer group that already has offset stored in Kafka

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

Ryuzaki L
  • 37,302
  • 12
  • 68
  • 98