0

I have requirement where need to have a Spring Boot Rest Service that a client application will call every 30 minutes and service is to return

  1. number of latest messages based on the number specified in query param e.g. http://messages.com/getNewMessages?number=10 in this case should return 10 messages

  2. number of messages based on the number and offset specified in query param e.g. http://messages.com/getSpecificMessages?number=5&start=123 in this case should return 5 messages starting offset 123.

I have simple standalone application and it works fine. Here is what I tested and would lke some direction of incorporating it in the service.

public static void main(String[] args) {
        // create kafka consumer
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-first-consumer-group");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, args[0]);

        Consumer<String, String> consumer = new KafkaConsumer<>(properties);

        // subscribe to topic
        consumer.subscribe(Collections.singleton("test"));
        consumer.poll(0);

        //get to specific offset and get specified number of messages
        for (TopicPartition partition : consumer.assignment())
            consumer.seek(partition, args[1]);

        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5000));

        System.out.println("Total Record Count ******* : " + records.count());      

        for (ConsumerRecord<String, String> record : records) {
            System.out.println("Message: " + record.value());
            System.out.println("Message offset: " + record.offset());           
            System.out.println("Message: " + record.timestamp());
            Date date = new Date(record.timestamp());
            Format format = new SimpleDateFormat("yyyy MM dd HH:mm:ss.SSS");
            System.out.println("Message date: " + format.format(date));
        }
        consumer.commitSync(); 

As my consumer will be on-demand wondering in Spring Boot Service how I can achieve this. Where do I specify the properties if I put in application.properties those get's injected at startup time but how do i control MAX_POLL_RECORDS_CONFIG at runtime. Any help appreciated.

Developer
  • 33
  • 6

2 Answers2

1

The answer to your question is here and the answer with code example is this answer. Both written by the excellent Gary Russell, the main or one of the main person behind Spring Kafka.

TL;DR:

If you want to arbitrarily rewind the partitions at runtime, have your listener implement ConsumerSeekAware and grab a reference to the ConsumerSeekCallback.

jumping_monkey
  • 5,941
  • 2
  • 43
  • 58
0

MAX_POLL_RECORDS_CONFIG only impact your kafka-client return the records to your spring service, it will never reduce the bytes that the consumer poll from kafka-server

enter image description here

see the above picture, no matter your start offset = 150 or 190, kafka server will return the whole data from (offset=110, offset=190), kafka server even didn't know how many records return to consumer, he only know the byte size = (220 - 110)

so i think you can control the record number by yourself,currently it is controlled by the kafka client jar, they are both occupy your jvm local memory

clevertension
  • 6,929
  • 3
  • 28
  • 33