8

I have the following code

class Consumer(val consumer: KafkaConsumer<String, ConsumerRecord<String>>) {

    fun run() {
        consumer.seekToEnd(emptyList())
        val pollDuration = 30 // seconds

        while (true) {
            val records = consumer.poll(Duration.ofSeconds(pollDuration))
            // perform record analysis and commitSync()
            }
        }
    }
}

The topic which the consumer is subscribed to continously receives records. Occasionally, the consumer will crash due to the processing step. When the consumer then is restarted, I want it to consume from the latest offset on the topic (i.e. ignore records that were published to the topic while the consumer was down). I thought the seekToEnd() method would ensure that. However, it seems like the method has no effect at all. The consumer starts to consume from the offset from which it crashed.

What is the correct way to use seekToEnd()?

Edit: The consumer is created with the following configs

fun <T> buildConsumer(valueDeserializer: String): KafkaConsumer<String, T> {
    val props = setupConfig(valueDeserializer)
    Common.setupConsumerSecurityProtocol(props)
    return createConsumer(props)
}

fun setupConfig(valueDeserializer: String): Properties {
    // Configuration setup
    val props = Properties()

    props[ConsumerConfig.GROUP_ID_CONFIG] = config.applicationId
    props[ConsumerConfig.CLIENT_ID_CONFIG] = config.kafka.clientId
    props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = config.kafka.bootstrapServers
    props[AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = config.kafka.schemaRegistryUrl

    props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = config.kafka.stringDeserializer
    props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = valueDeserializer
    props[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = "true"

    props[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = config.kafka.maxPollIntervalMs
    props[ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG] = config.kafka.sessionTimeoutMs

    props[ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG] = "false"
    props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "false"
    props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"

    return props
}

fun <T> createConsumer(props: Properties): KafkaConsumer<String, T> {
    val consumer = KafkaConsumer<String, T>(props)
    consumer.subscribe(listOf(config.kafka.inputTopic))
    return consumer
}
Liverbird97
  • 111
  • 2
  • 14
  • You should try commitSync before you poll – OneCricketeer Dec 03 '21 at 16:29
  • Also, if you disable auto commits and set `auto.offset.reset=latest`, it'll always start from the end of the topic upon restart – OneCricketeer Dec 04 '21 at 15:53
  • 1
    The `auto.offset.reset` configuration ONLY kicks in if the consumer group does not have a valid offset committed somewhere, as explained here: https://stackoverflow.com/a/32392174/11724337 – Liverbird97 Dec 06 '21 at 07:17
  • And why would you think that adding commitSync before poll would help? – Liverbird97 Dec 06 '21 at 08:52
  • If you don't auto commit (and your code doesn't commit itself), then there won't be any stored offset, therefore it'll always seek to that setting. Otherwise, if you commit before polling, then you'll be guaranteed to store the end offsets for the consumer group – OneCricketeer Dec 06 '21 at 14:26
  • I tried adding `commitSync()` right after the `seekToEnd()` call, but unfortunately it didn't do any difference :// – Liverbird97 Dec 06 '21 at 14:45

2 Answers2

5

I found a solution!

I needed to add a dummy poll as a part of the consumer initialization process. Since several Kafka methods are evaluated lazily, it is necessary with a dummy poll to assign partitions to the consumer. Without the dummy poll, the consumer tries to seek to the end of partitions that are null. As a result, seekToEnd() has no effect.

It is important that the dummy poll duration is long enough for the partitions to get assigned. For instance with consumer.poll((Duration.ofSeconds(1)), the partitions did not get time to be assigned before the program moved on to the next method call (i.e. seekToEnd()).

Working code could look something like this

class Consumer(val consumer: KafkaConsumer<String, ConsumerRecord<String>>) {

    fun run() {
        // Initialization 
        val pollDuration = 30 // seconds
        consumer.poll((Duration.ofSeconds(pollDuration)) // Dummy poll to get assigned partitions

        // Seek to end and commit new offset
        consumer.seekToEnd(emptyList())
        consumer.commitSync() 

        while (true) {
            val records = consumer.poll(Duration.ofSeconds(pollDuration))
            // perform record analysis and commitSync()
            }
        }
    }
}
Liverbird97
  • 111
  • 2
  • 14
  • 1
    Consumer's method assignment() returns Set, so it's a good idea to replace consumer.seekToEnd(emptyList()) with consumer.seekToEnd(consumer.assignment()) – Daemon2017 Apr 12 '23 at 14:15
  • I have a similar use case and doing 1) fetch topic partitions by the topic 2) `consumer.assign(assign those partitions list)` 3) `consumer.poll(PT20S)` – Sunil Kumar Jun 21 '23 at 09:15
2

The seekToEnd method requires the information on the actual partition (in Kafka terms TopicPartition) on which you plan to make your consumer read from the end.

I am not familiar with the Kotlin API, but checking the JavaDocs on the KafkaConsumer's method seekToEnd you will see, that it asks for a collection of TopicPartitions.

As you are currently using emptyList(), it will have no impact at all, just like you observed.

Michael Heil
  • 16,250
  • 3
  • 42
  • 77