10

I have a spring boot kafka application. My brokers are recycled every few days. The old brokers are deprovisioned and new brokers are provisioned.

I have a scheduler which is checking for brokers every few hours. I would like to make sure as soon as the we have new brokers, we should reload all the Spring Kafka related beans. Very similar to KafkaAutoConfiguration except I want a trigger on broker value change and load the auto configuration programmatically.

How do I call the auto configure programmatically whenever the old brokers are replaced with new one ?

s7vr
  • 73,656
  • 11
  • 106
  • 127
  • 1
    See if any of these help? https://stackoverflow.com/questions/51218086/how-to-reinitialize-a-spring-bean , https://www.oodlestechnologies.com/blogs/Reload-External-Configuration-on-Runtime-in-Spring-Boot/, http://www.tothenew.com/blog/loading-and-removing-bean-at-run-time-in-spring-application/, https://stackoverflow.com/questions/27998502/java-spring-recreate-specific-bean – Tarun Lalwani Jul 08 '19 at 02:25

1 Answers1

9

Your requirements sounds like Config Server in Spring Cloud:https://cloud.spring.io/spring-cloud-static/Greenwich.SR2/multi/multi__spring_cloud_config_2.html#_spring_cloud_config_2 with its @RefreshScope feature: https://cloud.spring.io/spring-cloud-static/Greenwich.SR2/multi/multi__spring_cloud_context_application_context_services.html#refresh-scope.

So, you need to specify your own beans and mark them with that annotation:

@Bean
@RefreshScope
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
    return new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());
}

@Bean
@RefreshScope
public ProducerFactory<?, ?> kafkaProducerFactory() {
    DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
            this.properties.buildProducerProperties());
    String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
    if (transactionIdPrefix != null) {
        factory.setTransactionIdPrefix(transactionIdPrefix);
    }
    return factory;
}

These two beans rely on the configuration properties for connection to Apache Kafka broker and that is really fully enough to have them refreshable. Whenever a ContextRefreshedEvent happens these beans are going to be re-initialized with a fresh configuration properties.

I think the ConsumerFactory consumers (MessageListenerContainer and KafkaListenerEndpointRegistry) have to be restarted on that event as well. The point is that MessageListenerContainer starts a long-living process and therefore caches a KafkaConsumer instance for the poll purposes.

All the ProducerFactory consumers don't need to be restarted. Even if KafkaProducer is cached in the DefaultKafkaProducerFactory it is going to be reinitialized during @RefreshScope phase.

UPDATE

I don’t use config server. I get the new hosts from consul catalog service.

Right, I didn't say that you use a Config Server. That just looks for me similar way. So, from big height I would really take a look into a Config Client implementation for your Consul catalog solution.

Nevertheless you still can emit a RefreshEvent which will trigger all your @RefreshScope'd beans to be reloaded. For that purpose you need to implement an ApplicationEventPublisherAware and emit that event whenever you have update from Consul. Remember: Kafka listener containers must be restarted. For that purpose you can listen for the RefreshScopeRefreshedEvent since you really are interested in the restart only when all the @RefreshScope have been refreshed.

More about refresh scope: https://gist.github.com/dsyer/a43fe5f74427b371519af68c5c4904c7

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • Thank you Artem. I don’t use config server. I get the new hosts from consul catalog service. Is it possible to invoke the context refresh event programmatically from the scheduler ? I’m using RefreshScope already in my project. How do we differentiate this RefreshScope from others ? – s7vr Jul 09 '19 at 09:50
  • Please, see an UPDATE in my answer. – Artem Bilan Jul 09 '19 at 14:29
  • Thanks again - So I finally got around to working on this - Regarding this _Nevertheless you still can emit a RefreshEvent which will trigger all your @RefreshScope'd beans to be reloaded._ how does the refreshscope knows what do update for the factory ? All got changed was brokers list - meaning how does `new RefreshEvent(this, brokers, "new brokers")` translates to new factory ? I'm missing that link. – s7vr Mar 12 '20 at 02:19
  • I think you need to start a new SO thread. It is kinda lost already in this old one - more than year ago... – Artem Bilan Mar 12 '20 at 02:26