3

I have a Kafka consumer configured in Spring Boot. Here's the config class:

@EnableKafka
@Configuration
@PropertySource({"classpath:kafka.properties"})
public class KafkaConsumerConfig {

    @Autowired
    private Environment env;

    @Bean
    public ConsumerFactory<String, GenericData.Record> consumerFactory() {

        dataRiverProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));
        dataRiverProps.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("group.id"));
        dataRiverProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, env.getProperty("enable.auto.commit"));
        dataRiverProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, env.getProperty("auto.commit.interval.ms"));
        dataRiverProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, env.getProperty("session.timeout.ms"));
        dataRiverProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("auto.offset.reset"));

        dataRiverProps.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, env.getProperty("schema.registry.url"));
        dataRiverProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        dataRiverProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());

        return new DefaultKafkaConsumerFactory<>(dataRiverProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, GenericData.Record> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, GenericData.Record> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

And here's the consumer:

@Component
public class KafkaConsumer {

    @Autowired
    private MessageProcessor messageProcessor;

    @KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
    public void consumeAvro(GenericData.Record message) {
        messageProcessor.process();
    }

}

Please note that I am using topics = "#{'${kafka.topics}'.split(',')}" to pick up the topics from a properties file. And this is what my kafka.properties file looks like:

kafka.topics=pwdChange,pwdCreation
bootstrap.servers=aaa.bbb.com:37900
group.id=pwdManagement
enable.auto.commit=true
auto.commit.interval.ms=1000
session.timeout.ms=30000
schema.registry.url=http://aaa.bbb.com:37800

Now if I am to add a new topic to the subscription, say pwdExpire, and modify the prop files as follows:

kafka.topics=pwdChange,pwdCreation,pwdExpire

Is there a way for my consumer to start subscribe to this new topic without restarting the server? I have found this post Spring Kafka - Subscribe new topics during runtime, but the documentation has this to say about metadata.max.age.ms:

The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.

Sounds to me it won't work. Thanks for your help!

Hua
  • 666
  • 2
  • 9
  • 21
  • Some more information: I have a scheduled task that runs every 5 minutes to check the DB. So if we add some control value in the DB, is there a way to force the program to fetch from the properties file and consume accordingly again? Thanks! – Hua Jan 31 '19 at 18:06
  • you could look at spring cloud config and use the annotation @refreshscope. I am not sure if this would work with the configuration annotated classes though. – Matt Jan 31 '19 at 18:17
  • So basically you can combine the spring config @ refresh and this link on reinitializeing the spring beans for the producer and consumer factory https://stackoverflow.com/questions/51218086/how-to-reinitialize-a-spring-bean that might work for you – Matt Jan 31 '19 at 18:25
  • 1
    I don't think refresh scope will work here; AFAIK, refresh scope only works with passive beans; the listener container is an active component (implements `SmartLifecycle` and is started/stopped by the application context. – Gary Russell Jan 31 '19 at 18:34
  • I don't have the app running on spring cloud. – Hua Jan 31 '19 at 18:58

1 Answers1

3

No; the only way to do that is to use a topic pattern; as new topics are added (that match the pattern), the broker will add them to the subscription, after 5 minutes, by default.

You can, however, add new listener container(s) for the new topic(s) at runtime.

Another option would be to load the @KafkaListener bean in a child application context and re-create the context each time the topic(s) change.

EDIT

See the javadocs for KafkaConsumer.subscribe(Pattern pattern)...

/**
 * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
 * The pattern matching will be done periodically against topics existing at the time of check.
 * <p>
 ...
Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Thanks! Are there examples that I can refer too? – Hua Jan 31 '19 at 18:59
  • @GaryRussell would you mind adding an example of pattern matching. Also to clarify I guess what you are saying is once a topic is created on the brokers as long as it matchs the pattern that the consumer is looking for then it pick it up every 5 minutes. This 5 minute timer is that a configurable property that's set by default on the consumer factory. – Matt Jan 31 '19 at 19:00
  • 1
    @Matt See the `topicPattern` property on `@KafkaListener`; it's a regex e.g. `pwd.*` for all topics starting with `pwd`. The 5 minutes is the default `metadata.max.age.ms` which is a consumer property (see the kafka documentation). `The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.` @Hua an example of which option? – Gary Russell Jan 31 '19 at 19:07
  • @GaryRussell Thanks for the reply! – Matt Jan 31 '19 at 19:21
  • Thanks @GaryRussell I am interested in Option 1. Just checked the API of KafkaListener, and honestly I don't see much difference between "topics" and "topicPattern". For "topics" -- The topics for this listener. The entries can be 'topic name', 'property-placeholder keys' or 'expressions'. An expression must be resolved to the topic name. For "topicPattern" -- The topic pattern for this listener. The entries can be 'topic name', 'property-placeholder keys' or 'expressions'. An expression must be resolved to the topic pattern. – Hua Jan 31 '19 at 20:25
  • Yes, the javadocs there need some improvement; they assume you are familiar with the Kafka APIs. See the javadocs for `KafkaConsumer.subscribe()` instead - I added them to the answer. – Gary Russell Jan 31 '19 at 20:35
  • @GaryRussell Thanks! – Hua Jan 31 '19 at 20:51