I have a Kafka consumer configured in Spring Boot. Here's the config class:
@EnableKafka
@Configuration
@PropertySource({"classpath:kafka.properties"})
public class KafkaConsumerConfig {
@Autowired
private Environment env;
@Bean
public ConsumerFactory<String, GenericData.Record> consumerFactory() {
dataRiverProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));
dataRiverProps.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("group.id"));
dataRiverProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, env.getProperty("enable.auto.commit"));
dataRiverProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, env.getProperty("auto.commit.interval.ms"));
dataRiverProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, env.getProperty("session.timeout.ms"));
dataRiverProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("auto.offset.reset"));
dataRiverProps.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, env.getProperty("schema.registry.url"));
dataRiverProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
dataRiverProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
return new DefaultKafkaConsumerFactory<>(dataRiverProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericData.Record> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, GenericData.Record> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
And here's the consumer:
@Component
public class KafkaConsumer {
@Autowired
private MessageProcessor messageProcessor;
@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
public void consumeAvro(GenericData.Record message) {
messageProcessor.process();
}
}
Please note that I am using topics = "#{'${kafka.topics}'.split(',')}" to pick up the topics from a properties file. And this is what my kafka.properties file looks like:
kafka.topics=pwdChange,pwdCreation
bootstrap.servers=aaa.bbb.com:37900
group.id=pwdManagement
enable.auto.commit=true
auto.commit.interval.ms=1000
session.timeout.ms=30000
schema.registry.url=http://aaa.bbb.com:37800
Now if I am to add a new topic to the subscription, say pwdExpire, and modify the prop files as follows:
kafka.topics=pwdChange,pwdCreation,pwdExpire
Is there a way for my consumer to start subscribe to this new topic without restarting the server? I have found this post Spring Kafka - Subscribe new topics during runtime, but the documentation has this to say about metadata.max.age.ms:
The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.
Sounds to me it won't work. Thanks for your help!