Your requirements sounds like Config Server in Spring Cloud:https://cloud.spring.io/spring-cloud-static/Greenwich.SR2/multi/multi__spring_cloud_config_2.html#_spring_cloud_config_2 with its @RefreshScope
feature: https://cloud.spring.io/spring-cloud-static/Greenwich.SR2/multi/multi__spring_cloud_context_application_context_services.html#refresh-scope.
So, you need to specify your own beans and mark them with that annotation:
@Bean
@RefreshScope
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());
}
@Bean
@RefreshScope
public ProducerFactory<?, ?> kafkaProducerFactory() {
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
this.properties.buildProducerProperties());
String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
return factory;
}
These two beans rely on the configuration properties for connection to Apache Kafka broker and that is really fully enough to have them refreshable. Whenever a ContextRefreshedEvent
happens these beans are going to be re-initialized with a fresh configuration properties.
I think the ConsumerFactory
consumers (MessageListenerContainer
and KafkaListenerEndpointRegistry
) have to be restarted on that event as well. The point is that MessageListenerContainer
starts a long-living process and therefore caches a KafkaConsumer
instance for the poll
purposes.
All the ProducerFactory
consumers don't need to be restarted. Even if KafkaProducer
is cached in the DefaultKafkaProducerFactory
it is going to be reinitialized during @RefreshScope
phase.
UPDATE
I don’t use config server. I get the new hosts from consul catalog service.
Right, I didn't say that you use a Config Server. That just looks for me similar way. So, from big height I would really take a look into a Config Client implementation for your Consul catalog solution.
Nevertheless you still can emit a RefreshEvent
which will trigger all your @RefreshScope
'd beans to be reloaded. For that purpose you need to implement an ApplicationEventPublisherAware
and emit that event whenever you have update from Consul. Remember: Kafka listener containers must be restarted. For that purpose you can listen for the RefreshScopeRefreshedEvent
since you really are interested in the restart only when all the @RefreshScope
have been refreshed.
More about refresh scope: https://gist.github.com/dsyer/a43fe5f74427b371519af68c5c4904c7