1

I am creating a spring-boot application which will create multiple topics. I am taking the list of topic names and configurations from a .csv file. I am trying this code but it can only create a single topic but its not favorable to create multiple topics using this. Is there a way to create multiple topics using spring?

@Bean
public KafkaAdmin admin(){
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
    return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
        NewTopic topic = new NewTopic(String.format("topic%d",1), 10, (short) 1);
        Map<String, String> extraTopicConfig = new HashMap<String, String>();
        extraTopicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact");
        extraTopicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1");
        topic.configs(extraTopicConfig);
        return topic;

}
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Naman Kumar
  • 75
  • 1
  • 6

4 Answers4

1

I came upon this old question looking for an answer. I solved it like this:

@Configuration
public class TopicCreation {
  final String[] topicNames = new String[] {"topic1", "topic2"};
  final SingletonBeanRegistry beanRegistry;

  public TopicCreation(GenericApplicationContext context) {
    this.beanRegistry = context.getBeanFactory();
  }

  @PostConstruct
  public void createTopics() {
    for (String topic : topicNames) {
      NewTopic newTopic = TopicBuilder.name(topic)
          .replicas(1)
          .partitions(1)
          .build();
      beanRegistry.registerSingleton("topic-" + topic, newTopic);
    }
  }
}
Lasse L
  • 1,278
  • 1
  • 9
  • 14
1

For anyone who is struggling to programmatically create multiple topics or topics on different brokers during start-up, consider using ApplicationRunner. Similar to what has been described in the accepted answer here.

Konstantin Grigorov
  • 1,356
  • 12
  • 20
0

Better way to do it nowadays:

org.springframework.kafka.core already provides a method for this exact purpose: NewTopics. Its constructor accepts a variadic argument with type NewTopic, and it creates all topics passed to it.

Like this:

@Value("${topic.names}")
private List<String> topicNames; // <-- initialize your list of topic names, or read from properties

@Bean 
public NewTopics createTopics() {
  List<NewTopic> topics = new ArrayList<>(); 
  for (String name : topicNames) {
    topics.add(TopicBuilder.name(name).partitions(1).replicas(1).build();
  }
  return NewTopics(topics.toArray(new String[0])); // <-- pass list as varargs
}
Rodrigo Rodrigues
  • 7,545
  • 1
  • 24
  • 36
0

Multiple topics in 1 bean using Springboot 2.7

application.yml

adapter-app-config:
  topics:
    -
      name: pisa.adapterborder.dcntoc.test
      partitions: 1
      replicas: 2
    -
      name: pisa.adapterborder.valueindividual.test
      partitions: 1
      replicas: 2

@ConfigurationProperties(prefix = "adapter-app-config")
@Data
public class AdapterConfig {

    private List<Topic> topics;


    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Topic {

        private String name;
        private Integer partitions;
        private Integer replicas;
    }

}

@Configuration
@Slf4j
public class ApplicationConfiguration {

    @Autowired
    private AdapterConfig adapterConfig;


    @Bean
    public KafkaAdmin.NewTopics createKafkaTopics() {

        List<NewTopic> topics = new ArrayList<>();

        adapterConfig.getTopics().forEach(topic -> topics.add(TopicBuilder.name(topic.getName()).partitions(topic.getPartitions()).replicas(topic.getReplicas()).build()));

        return new KafkaAdmin.NewTopics(topics.toArray(NewTopic[]::new));
    }

}
JCompetence
  • 6,997
  • 3
  • 19
  • 26