I have a fairly straightforward Kafka consumer:
MessageListener<String, T> messageListener = record -> {
doStuff( record.value()));
};
startConsumer(messageListener);
protected void startConsumer(MessageListener<String, T> messageListener) {
ConcurrentMessageListenerContainer<String, T> container = new ConcurrentMessageListenerContainer<>(
consumerFactory(this.brokerAddress, this.groupId),
containerProperties(this.topic, messageListener));
container.start();
}
I can consume messages without any issue.
Now, I have the requirement to seek
from a specific offset based on the result of a call to offsetsForTimes
on the Kafka Consumer.
I understand that I can seek
to a certain position using the ConsumerSeekAware
interface:
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignments.forEach((t, o) -> callback.seek(t.topic(), t.partition(), ?????));
}
The problem now, is that I do not have access to the Kafka Consumer inside the callback, therefore I have no way to call offsetsForTimes
.
Is there any other way to achieve this?