14

I need to configure retention policy of a particular topic during creation. I tried to look for solution i could only find command level alter command as below

./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config retention.ms=1680000

Can someone let me know a way to configure it during creation, something like xml or properties file configuration in spring-mvc.

Archimedes Trajano
  • 35,625
  • 19
  • 175
  • 265
Rachan R K
  • 211
  • 2
  • 13

3 Answers3

18

Spring Kafka lets you create new topics by declaring @Beans in your application context. This will require a bean of type KafkaAdmin in the application context, which will be created automatically if using Spring Boot. You could define your topic as follows:

@Bean
public NewTopic myTopic() {
    return TopicBuilder.name("my-topic")
            .partitions(4)
            .replicas(3)
            .config(TopicConfig.RETENTION_MS_CONFIG, "1680000")
            .build();
}

If you are not using Spring Boot, you'll additionally have to define the KafkaAdmin bean:

@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
    return new KafkaAdmin(configs);
}

If you want to edit the configuration of an existing topic, you'll have to use the AdminClient, here's the snippet to change the retention.ms at a topic level:

Map<String, Object> config = new HashMap<>();                
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
                         
AdminClient client = AdminClient.create(config);
                         
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "new-topic");
            
// Update the retention.ms value
ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1680000");
Map<ConfigResource, Config> updateConfig = new HashMap<>();
updateConfig.put(resource, new Config(Collections.singleton(retentionEntry)));

AlterConfigOp op = new AlterConfigOp(retentionEntry, AlterConfigOp.OpType.SET);
Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>(1);
configs.put(resource, Arrays.asList(op));

AlterConfigsResult alterConfigsResult = client.incrementalAlterConfigs(configs);
        alterConfigsResult.all();

The configuration can be set up automatically using this @PostConstruct method that takes in NewTopic beans.


    @Autowired
    private Set<NewTopic> topics;

    @PostConstruct
    public void reconfigureTopics() throws ExecutionException, InterruptedException {

        try (final AdminClient adminClient = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers))) {
            adminClient.incrementalAlterConfigs(topics.stream()
                .filter(topic -> topic.configs() != null)
                .collect(Collectors.toMap(
                    topic -> new ConfigResource(ConfigResource.Type.TOPIC, topic.name()),
                    topic -> topic.configs().entrySet()
                        .stream()
                        .map(e -> new ConfigEntry(e.getKey(), e.getValue()))
                        .peek(ce -> log.debug("configuring {} {} = {}", topic.name(), ce.name(), ce.value()))
                        .map(ce -> new AlterConfigOp(ce, AlterConfigOp.OpType.SET))
                        .collect(Collectors.toList())
                )))
                .all()
                .get();
        }

    }
Archimedes Trajano
  • 35,625
  • 19
  • 175
  • 265
Sergi Almar
  • 8,054
  • 3
  • 32
  • 30
  • Thank you for the sample code. @Sergi does **alterConfigsResult.all()** throw any exception if we are trying to change retention of a non-existing topic? How can we know if we are modifying a existing topic? – Rachan R K Jun 13 '19 at 05:42
  • 1
    Updating the config of an existing topic doesn't throw any exception. You could use the describeConfigs method to get the current configuration of an existing topic – Sergi Almar Jun 13 '19 at 07:57
1

I guess you could use admin client (https://kafka.apache.org/22/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html) for this. You can create Admin client instance in your application and use create or alter topic command for manipulating topic configurations, including retention.

0

To create a topic using AdminClient programmatically with the specified retention time, do the following:

NewTopic topic = new NewTopic(topicName, numPartitions, replicationFactor);
topic.configs(Map.of(TopicConfig.RETENTION_MS_CONFIG, retentionMs.toString()));
adminClient.createTopics(List.of(topic));
Inego
  • 1,039
  • 1
  • 12
  • 19