1

I am implementing Kafka consumer class to receive messages. I wanted to only get the new messages every time. Therefore, I set enable.auto.commit true. However the offset does not seem to change at all. Even though the topic, consumer group and partition has been always the same.

Here is my consumer code:

    consumerConfig.put("bootstrap.servers", bootstrap);
    consumerConfig.put("group.id", KafkaTestConstants.KAFKA_GROUP);
    consumerConfig.put("enable.auto.commit", "true");
    consumerConfig.put("auto.offset.reset", "earliest");
    consumerConfig.put("auto.commit.interval", 1000);
    consumerConfig.put("key.deserializer", StringDeserializer.class.getName());
    consumerConfig.put("value.deserializer", StringDeserializer.class.getName());
    StringDeserializer deserializer = new StringDeserializer();
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerConfig, deserializer, deserializer);

    TopicPartition tp = new TopicPartition(KafkaTestConstants.KAFKA_TOPIC, 0);
    List<TopicPartition> tps = Arrays.asList(tp);
    kafkaConsumer.assign(tps);
    long offset = kafkaConsumer.position(tp);
    System.out.println("Offset of partition: " + offset);

    List<String> messages = new ArrayList<String>();
    ConsumerRecords<String, String> records = kafkaConsumer.poll(100);

    for (ConsumerRecord<String, String> record : records) {
        System.out.println("Message received: " + record.value());
        messages.add(record.value());
    }

    consumer.commitAsync();
    System.out.println("Offset commited.\n");
    consumer.close();

No matter how many times I run it, it always shows the offset is 0. Therefore, it always receive all messages from the very beginning. What am I missing?

EDIT: Based on Matthias's answer, I decided to manually commit the offset. However commitSync() would hang. commitAsync() sort of works. I will explain the "sort of" later. Here is what the code does:

producer send 2 messages;
consumer initiates;
print out current position;
consumer.poll();
print received messages;
consumer.commitAsync();

This is how this code behaves. Say I have 100 messages. Now producers sends 2 new messages. Before consumer poll, it would show current offset position as 102 which is supposed to be 100. Therefore, no new messages will be printed out. It is almost like the offset is updated after producer sent the messages.

ddd
  • 4,665
  • 14
  • 69
  • 125
  • "auto.commit.interval" to "auto.commit.interval.ms" ? -> https://kafka.apache.org/documentation/#newconsumerconfigs – Quentin Geff Mar 09 '17 at 17:20
  • @QuentinGeff Changed to `auto.commit.interval.ms` and still the same. – ddd Mar 09 '17 at 17:30
  • Did you try to control the consumer offset manually ? as explain here : https://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html – Quentin Geff Mar 09 '17 at 17:49
  • @QuentinGeff I tried both `commitSync()` and `commitAsync()`, and still not working. I printed out offset after receiving each message and it increments just fine. It seems like offset gets somehow reset after every run. – ddd Mar 09 '17 at 18:05

1 Answers1

1

Auto commit only works if you use consumer group management, and for this, you need to "subscribe" to a topic, but not "assign" partitions manually.

Compare the JavaDocs of KafkaConsumer. It's a long read, but required to understand the subtle details on how the use the consumer correctly: https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

Furthermore, if auto-commit is enabled, it will commit within poll (ie, a call to poll() might commit the messages return from the previous call to poll()) and not when you iterate through the returne messages. This also means, that your commits will "jump" forward, like from committed offset 0 to 100 (if you received 100 messages by poll for a single partition).

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • @MatthiasJSax `subsription` does not work for me. The `polling` would hang forever. That's why I switched to `assign`. See the [question I asked earlier](http://stackoverflow.com/questions/42496443/why-consumer-hangs-while-consuming-messages-from-kafka-on-dc-os-using-client-api). – ddd Mar 09 '17 at 19:31
  • Not sure why `subscribe()` let the consumer hang. However, if you don't use "consumer group management" your need to take care of fail-over scenarios manually (ie, if you have multiple consumer and need to ensure to assign different partitions -- if you don't want to read data multiple times -- and you need to detect failure by yourself and re-assign partitions of the failed consumer to the remaining consumer manually -- of some data get's not processed). You also need to commit manually, read committed offset manually on startup to resume, and "auto.offset.reset" does not work either. – Matthias J. Sax Mar 09 '17 at 19:52
  • Thus, I would recommend to find the root case for the problem with `subscribe()` instead of switching to `assign()`. But it's of course up to you. – Matthias J. Sax Mar 09 '17 at 19:53
  • I've tried manual offset commit. It did seem to update offset. However the commit seems to occur after messages produced and before the consumption. See details in my edit. – ddd Mar 09 '17 at 21:19
  • Can you update you consumer code? Do you use `KafkaConsumer#committed(TopicPartitions)` ? – Matthias J. Sax Mar 10 '17 at 00:56
  • there is really nothing to update except `commitAsync()`. what does `commited(TopicPartitions) do? – ddd Mar 10 '17 at 01:33
  • `committed()` return the last committed offset of a partition (as described in the JavaDocs -- it would be helpful if you would read those ;)) – Matthias J. Sax Mar 10 '17 at 04:26
  • I tried to add `long offset = consumer.committed(tp).offset(); consumer.seek(tp, offset);` after assigning the partition and got NullPointerException while getting the offset – ddd Mar 10 '17 at 15:31
  • Hi Is there a solution to this? I am facing a similar issue :( – user1372469 Jun 25 '22 at 14:26