1

I have a spring boot application that needs to connect N number of Kafka clusters. based on some condition Kafka template need to switch and send a message

I have seen some solutions to create separate Kafka template beans but in my use case number of clusters will change at the deployment time

ex:

@Bean(name = "cluster1")
public KafkaTemplate<String, String> kafkaTemplatesample1() {
    return new KafkaTemplate<>(devProducerFactory1());
}

@Bean(name = "cluster2")
public KafkaTemplate<String, String> kafkaTemplatesample2() {
    return new KafkaTemplate<>(devProducerFactory2());
}

is there any other solution for this? if you can share a sample code its much appreciated

1 Answers1

0

Let's assume that each cluster can be described with the following attributes:

@Getter
@Setter
public class KafkaCluster {
  private String beanName;
  private List<String> bootstrapServers;
}

For example, two clusters are defined in the application.properties:

kafka.clusters[0].bean-name=cluster1
kafka.clusters[0].bootstrap-servers=CLUSTER_1_URL
kafka.clusters[1].bean-name=cluster2
kafka.clusters[1].bootstrap-servers=CLUSTER_2_URL

Those properties are needed before beans are instantiated, to register KafkaTemplate beans' definitions, which makes @ConfigurationProperties unsuitable for this case. Instead, Binder API is used to bind them programmatically.

KafkaTemplate beans' definitions can be registered in the implementation of BeanDefinitionRegistryPostProcessor interface.

public class KafkaTemplateDefinitionRegistrar implements BeanDefinitionRegistryPostProcessor {

  private final List<KafkaCluster> clusters;

  public KafkaTemplateDefinitionRegistrar(Environment environment) {
    clusters= Binder.get(environment)
        .bind("kafka.clusters", Bindable.listOf(KafkaCluster.class))
        .orElseThrow(IllegalStateException::new);
  }

  @Override
  public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
    clusters.forEach(cluster -> {
      GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
      beanDefinition.setBeanClass(KafkaTemplate.class);
      beanDefinition.setInstanceSupplier(() -> kafkaTemplate(cluster));
      registry.registerBeanDefinition(cluster.getBeanName(), beanDefinition);
    });
  }

  @Override
  public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
  }

  public ProducerFactory<String, String> producerFactory(KafkaCluster kafkaCluster) {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(
        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
        kafkaCluster.getBootstrapServers());
    configProps.put(
        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
        StringSerializer.class);
    configProps.put(
        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        StringSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
  }

  public KafkaTemplate<String, String> kafkaTemplate(KafkaCluster kafkaCluster) {
    return new KafkaTemplate<>(producerFactory(kafkaCluster));
  }
}

Configuration class for the KafkaTemplateDefinitionRegistrar bean:

@Configuration
public class KafkaTemplateDefinitionRegistrarConfiguration {
  @Bean
  public static KafkaTemplateDefinitionRegistrar beanDefinitionRegistrar(Environment environment) {
    return new KafkaTemplateDefinitionRegistrar(environment);
  }
}

Additionally, exclude KafkaAutoConfiguration in the main class to prevent creating the default KafkaTemplate bean. This is probably not the best way because all the other KafkaAutoConfiguration beans are not created in that case.

@SpringBootApplication(exclude={KafkaAutoConfiguration.class})

Finally, below is a simple test that proves the existence of two KafkaTemplate beans.

@SpringBootTest
class SpringBootApplicationTest {
    @Autowired
    List<KafkaTemplate<String,String>> kafkaTemplates;
    
    @Test
    void kafkaTemplatesSizeTest() {
        Assertions.assertEquals(kafkaTemplates.size(), 2);
    }
}

For reference: Create N number of beans with BeanDefinitionRegistryPostProcessor, Spring Boot Dynamic Bean Creation From Properties File

Toni
  • 3,296
  • 2
  • 13
  • 34