0

While using spring kafka I am able to read the messages from the topic based on time stamp with the below code -

                ConsumerRecords<String, String> records = consumer.poll(100);
                if (flag) {
                    Map<TopicPartition, Long> query = new HashMap<>();
                    query.put(new TopicPartition(kafkaTopic, 0), millisecondsFromEpochToReplay);

                    Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);
                    if(result != null)
                    {
                        records = ConsumerRecords.empty();
                    }

                    result.entrySet().stream()
                            .forEach(entry -> consumer.seek(entry.getKey(), entry.getValue().offset()));

                    flag = false;
                }

How can the same functionality be achieved using spring integration DSL - with KafkaMessageDrivenChannelAdapter? How can we set the Integration Flows and read message from topic based on the timestamp?

Avinash Singh
  • 4,970
  • 8
  • 20
  • 35
Developer
  • 239
  • 1
  • 3
  • 17

1 Answers1

2

Configure the adapter's listener container with a ConsumerAwareRebalanceListener and perform the lookup/seeks when the partitions are assigned.

EDIT

Using Spring Boot (but you can configure the container however you create the container)...

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=so54664761

and

@SpringBootApplication
public class So54664761Application {

    public static void main(String[] args) {
        SpringApplication.run(So54664761Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> template.send("so54664761", "foo");
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so54664761", 1, (short) 1);
    }

    @Bean
    public IntegrationFlow flow(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
        ConcurrentMessageListenerContainer<String, String> container = container(containerFactory);
        return IntegrationFlows.from(new KafkaMessageDrivenChannelAdapter<>(container))
                .handle(System.out::println)
                .get();
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> container(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("so54664761");
        container.getContainerProperties().setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {

            @Override
            public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
                System.out.println("Partitions assigned - do the lookup/seeks here");
            }

        });
        return container;
    }

}

and

Partitions assigned - do the lookup/seeks here
GenericMessage [payload=foo, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2f5b2297, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=so54664761, kafka_receivedTimestamp=1550241100112}]
Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Example [here](https://stackoverflow.com/questions/52973119/how-to-test-a-consumerawarerebalancelistener/52976119#52976119). – Gary Russell Feb 13 '19 at 16:51
  • Thanks @Gary, I am new to this so need help on understanding how can we inject the ConsumerAwareRebalance Listener in Spring Integration DSL , my flow looks like – Developer Feb 15 '19 at 06:32
  • See the edit to my answer; it uses Boot's auto-configured container factory, but you can add the listener to the container properties however you create the container. – Gary Russell Feb 15 '19 at 14:34
  • Thank you Gary for your help. If we want to have multiple consumer within the same consumer group should we set the concurrency level of ConcurrentMessageListenerContainer? How can we set multiple consumers in the above configuration? – Developer Feb 18 '19 at 10:30
  • When using Spring Boot, `spring.kafka.consumer.concurrency=5`; when creating containers yourself `container.setConcurrency(5)`. – Gary Russell Feb 18 '19 at 17:51