While using spring kafka
I am able to read the messages from the topic based on time stamp with the below code -
ConsumerRecords<String, String> records = consumer.poll(100);
if (flag) {
Map<TopicPartition, Long> query = new HashMap<>();
query.put(new TopicPartition(kafkaTopic, 0), millisecondsFromEpochToReplay);
Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);
if(result != null)
{
records = ConsumerRecords.empty();
}
result.entrySet().stream()
.forEach(entry -> consumer.seek(entry.getKey(), entry.getValue().offset()));
flag = false;
}
How can the same functionality be achieved using spring integration DSL - with KafkaMessageDrivenChannelAdapter
?
How can we set the Integration Flows and read message from topic based on the timestamp?