0

I need to read set of records from start offset to end offset. I use for this purpose dedicated Kafka consumer. I am OK with at least once semantic (in case, if given application instance goes down, and new applications instance re-reads records from that start offset).

So, can I use such code?

private static KafkaConsumer<Long, String> createConsumer() {

    final Properties props = new Properties();

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    return new KafkaConsumer<>(props);
}

public void process() {

    KafkaConsumer consumer = createConsumer();
    TopicPartition topicPartition = new TopicPartition("topic", 2);
    consumer.assign(List.of(topicPartition));

    long startOffset = 42;
    long endOffset = 100;

    consumer.seek(topicPartition, startOffset);

    boolean isRunning = true;
    while (isRunning) {
        final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000);

        for (ConsumerRecord<Long, String> record : consumerRecords) {
            if (record.offset() >= endOffset) {
                isRunning = false;
                break;
            }
        }
    }

    consumer.close();
}

So:

  • I have no commit()
  • I disable auto-commit
  • I have no group-id

Is it correct code? Or it has some hidden problems?

Max
  • 1,803
  • 3
  • 25
  • 39
  • You can, even though it looks like you have wrong technology. What would be wrong with loading those events into a proper DB and reload it on demand? This answer could be [interesting](https://stackoverflow.com/questions/28561147/how-to-read-data-using-kafka-consumer-api-from-beginning) as well. There are couple of ways to achieve what you want to do – senseiwu Mar 20 '19 at 18:55
  • Possible duplicate of [How to read data using Kafka Consumer API from beginning?](https://stackoverflow.com/questions/28561147/how-to-read-data-using-kafka-consumer-api-from-beginning) – senseiwu Mar 20 '19 at 18:58
  • You should also change your code to use `props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");` – senseiwu Mar 20 '19 at 19:02
  • @senseiwu is `props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");` more fast? – Max Mar 20 '19 at 19:06
  • if you don'T add it, then you won't consume from the earliest offset for a new consumer. – senseiwu Mar 20 '19 at 19:08
  • @senseiwu I cannot understand, why you talk about `earliest` offset? My initial task about reading from some `start` to some `finish`. It is not about `earliest` offset. – Max Mar 20 '19 at 19:10
  • ok, I overlooked that. Then you could. What would be in any case wrong with committing already consumed messages? – senseiwu Mar 20 '19 at 19:13
  • @senseiwu as I understand, `commit` is network request to `kafka broker`. So, it is additional load on kafka broker and networking, right? It seems, that I can reduce these actions. – Max Mar 20 '19 at 19:15
  • Offset commits are done using a binary protocol over TCP. They are relatively lightweight – senseiwu Mar 21 '19 at 08:18
  • @senseiwu can you tell me, why I have 500 ms duration of synchronous commit execution? – Max Mar 21 '19 at 21:07

1 Answers1

1

Yes, it is correct usage, and you shouldn't run into any issues. It's not typical usage of a Kafka consumer, but it is allowed.

From the official KafkaConsumer javadoc (my highlights):

https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

Controlling The Consumer's Position

In most use cases the consumer will simply consume records from beginning to end, periodically committing its position (either automatically or manually). However Kafka allows the consumer to manually control its position, moving forward or backwards in a partition at will. This means a consumer can re-consume older records, or skip to the most recent records without actually consuming the intermediate records. There are several instances where manually controlling the consumer's position can be useful.

...

Kafka allows specifying the position using seek(TopicPartition, long) to specify the new position. Special methods for seeking to the earliest and latest offset the server maintains are also available ( seekToBeginning(Collection) and seekToEnd(Collection) respectively).

mjuarez
  • 16,372
  • 11
  • 56
  • 73