1

I'm currently using kafka-python==2.0.1,

from kafka import KafkaAdminClient
from kafka.admin import NewTopic

topic_name = "retention_test"
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=3, topic_configs={'log.retention.hours' : '100'})
response = admin.create_topics([topic])
print(response)

But unable to create a topic, and throwing following error -

raise error_type(
kafka.errors.InvalidConfigurationError: [Error 40] InvalidConfigurationError: Request 'CreateTopicsRequest_v3(create_topic_requests=[(topic='retention_test', num_partitions=1, replication_factor=3, replica_assignment=[], configs=[(config_key='log.retention.hours', config_value=100)])], timeout=30000, validate_only=False)' failed with response 'CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic='retention_test', error_code=40, error_message='Unknown topic config name: log.retention.hours')])
krishna reddy
  • 295
  • 2
  • 15

1 Answers1

2

Krishna,

to my knowledge the "NewTopics" object, which you are giving to the "NewTopic" does not allow the topic-property "log.retention.hours". log.retention.hours is a property of a broker which is used as a default value when a topic is created. When you change configurations of the topic, you should specify a topic-level property.

A topic-level property for log retention time is retention.ms.

References (1) https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html (2) http://kafka.apache.org/081/documentation.html#topic-config (3) cross-reference: changing kafka retention period during runtime