1

I have a fairly straightforward Kafka consumer:

MessageListener<String, T> messageListener = record -> {

    doStuff( record.value()));
  };

  startConsumer(messageListener);


protected void startConsumer(MessageListener<String, T> messageListener) {
ConcurrentMessageListenerContainer<String, T> container = new ConcurrentMessageListenerContainer<>(
    consumerFactory(this.brokerAddress, this.groupId),
    containerProperties(this.topic, messageListener));

   container.start();
}

I can consume messages without any issue. Now, I have the requirement to seek from a specific offset based on the result of a call to offsetsForTimes on the Kafka Consumer.

I understand that I can seek to a certain position using the ConsumerSeekAware interface:

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
        ConsumerSeekCallback callback) {

      assignments.forEach((t, o) -> callback.seek(t.topic(), t.partition(), ?????));
}

The problem now, is that I do not have access to the Kafka Consumer inside the callback, therefore I have no way to call offsetsForTimes.

Is there any other way to achieve this?

Luciano
  • 847
  • 1
  • 11
  • 23

1 Answers1

1

Use a ConsumerAwareRebalanceListener to do the initial seeks (introduced in 2.0).

The current version is 2.2.0.

How to test a ConsumerAwareRebalanceListener?

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • I added a link to another answer. – Gary Russell Nov 07 '18 at 16:36
  • Hi Gary, I'm Using apache Kafka consumer and my offsetfortimes method returns only few partitions offset positions though all partitions are assigned.. I check ConsumerRebalanceListner.onPartitionsAssigned method receives all partitions but when I do offsetfortimes for all partitions inside the onPartitionsAssigned method it returns only the 1 or 2 partitions offset positions. Do u have any idea on the pblm? I posted a question on this too. – PrabaharanKathiresan Dec 06 '19 at 06:20
  • It's best not to ask new questions in comments, but I [found your other question](https://stackoverflow.com/questions/59200574). In future if you comment on an old answer, add a link to the new question. – Gary Russell Dec 06 '19 at 14:51
  • Thanks Gary. Sure will do in future. – PrabaharanKathiresan Dec 07 '19 at 16:30