So far I haven't seen a python client that implements the creation of a topic explicitly without using the configuration option to create automatically the topics.
7 Answers
You can programmatically create topics using either kafka-python
or confluent_kafka
client which is a lightweight wrapper around librdkafka.
Using kafka-python
from kafka.admin import KafkaAdminClient, NewTopic
admin_client = KafkaAdminClient(
bootstrap_servers="localhost:9092",
client_id='test'
)
topic_list = []
topic_list.append(NewTopic(name="example_topic", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
Using confluent_kafka
from confluent_kafka.admin import AdminClient, NewTopic
admin_client = AdminClient({
"bootstrap.servers": "localhost:9092"
})
topic_list = []
topic_list.append(NewTopic("example_topic", 1, 1))
admin_client.create_topics(topic_list)

- 36,235
- 20
- 134
- 156
-
could you add topic configuration e.g max.message.bytes=1000000 example for confluent_kafka – Rubber Duck Dec 01 '19 at 07:09
-
1Using the above technique I'm able to create Topic(s), I get the response {'jjd_topic1':
} but when I then List the topics the new topic is not in the list... it's like it doesn't persist it. What could be happening here? – JamesD Jun 04 '21 at 17:28 -
1I also see no new topic in kafka here as a result by using the confluent_kafka answer. – Mayak Oct 05 '21 at 21:11
-
1I also saw similar issue, but once you produce a message on that topic, it starts showing up. ` from confluent_kafka import Producer producer = Producer({ "bootstrap.servers": "localhost:9092" }) producer.produce("example_topic", key='key1', value='value1') producer.flush() ` – user3900576 Mar 02 '22 at 02:48
-
Is there way to create topic using kafka default properties? I mean without explicitly mentioning partitions and replication factor! – sandeep P Dec 08 '22 at 10:19
-
@JamesD i think you need to wait for the operation to finish. See Basic AdminClient Example https://github.com/confluentinc/confluent-kafka-python#basic-adminclient-example – Winand Mar 23 '23 at 12:05
If you can run confluent_kafka
(Python) v0.11.6
or above, then the following is how to create kafka topics
, list kafka topics
and delete kafka topics
:
>>> import confluent_kafka.admin, pprint
>>> conf = {'bootstrap.servers': 'broker01:9092'}
>>> kafka_admin = confluent_kafka.admin.AdminClient(conf)
>>> new_topic = confluent_kafka.admin.NewTopic('topic100', 1, 1)
# Number-of-partitions = 1
# Number-of-replicas = 1
>>> kafka_admin.create_topics([new_topic,]) # CREATE (a list(), so you can create multiple).
{'topic100': <Future at 0x7f524b0f1240 state=running>} # Stdout from above command.
>>> pprint.pprint(kafka_admin.list_topics().topics) # LIST
{'topic100' : TopicMetadata(topic100, 1 partitions),
'topic99' : TopicMetadata(topic99, 1 partitions),
'topic98' : TopicMetadata(topic98, 1 partitions)}
And to delete kafka topics
using that same kafka_admin
object, this:
kafka_admin.delete_topics(['topic99', 'topic100',]) # DELETE
I hope this helps. \(◠﹏◠)/

- 5,215
- 6
- 57
- 64
-
P.S. Where possible, I prefer the `confluent_kafka` library to the `kafka-python` library because the former is a "thin wrapper" (quoting the Confluent literature) over the `librdkafka C/C++ library`; and therefore performant. Though, in fairness, `kafka-python` is more Pythonic and both libraries work well. – NYCeyes Jan 25 '19 at 22:10
-
2When I open a python shell and I type your code in line for line it works 100% - I create the topic and when I list all topics it it there. But when I put it in a .py file and run the file, for some reason it does not create even though it does not give an error. If I copy every line in the file, and paste it into a python shell, then it creates again. The exact same code works in the shell but not in a file... – Alfa Bravo Jan 04 '20 at 13:48
-
@AlfaBravo Hmmm... Are you sure the program is running at all? Try inserting `print('hello')` statements around the code to check. – NYCeyes Jan 04 '20 at 17:10
-
2Yes, I have experienced that first hand - the program runs, and the client returns information about the topics that got created {'jjd_topic1':
}, but when I run the list, they are not there. Like they never got committed. Why would that be? – JamesD Jun 04 '21 at 21:45 -
1Super bizarre -- @AlfaBravo I am experiencing the same issue. If I run as a .py file it doesn't create the topic, but If I go into a python shell and do it it gets created... – Nicole White Jun 29 '21 at 22:53
It looks like you can can use the following to ensure that your topic already exist (I assume you are using the following kafka python implementation):
client = KafkaClient(...)
producer = KafkaProducer(...)
client.ensure_topic_exists('my_new_topic')
producer.send_messages('my_new_topic', ...)

- 214,103
- 147
- 703
- 753
-
4That won't work. `ensure_topic_exists` only works with the auto topic creation enabled. https://github.com/mumrah/kafka-python/blob/cd81cf0ec8c1b7e7651374c5d1cbd105d003d352/kafka/client.py#L305-L306 – zackdever May 15 '15 at 00:48
It seems that there is no kafka server api to create a topic so you have to use topic automatic creation of the or the command line tool:
bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test
It's already too late. I don't know about a command for explicit creation of the topics but the following creates and adds the messages.
I created a python kafka producer:
prod = KafkaProducer(bootstrap_servers='localhost:9092')
for i in xrange(1000):
prod.send('xyz', str(i))
In the list of Kafka topics xyz
was not there previously. when I did the above method, the Python-kafka client created it and added the messages to it.

- 885
- 1
- 18
- 32
-
3Actually the broker created the topic and only because auto.topic.create.enable was set to "true". All topics created this way will have the default configuration with may or may not be good for your use case. – Hans Jespersen Jun 21 '17 at 06:11
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
topic = 'topic-name'
producer.send(topic, final_list[0]).get(timeout=10)

- 1,371
- 16
- 15
-
-
@BorisTsema Please refer: https://github.com/dpkp/kafka-python/blob/master/kafka/producer/kafka.py#L516 It's a topic value, if I'm not wrong. The data that you want to send to consumer. – Mohsin Aljiwala Sep 24 '18 at 14:32
The AdminClient API needed to do programmatic topic creation and configuration was just added in Kafka 0.11 (initially for Java)
It is expected that non-Java client libraries will add this functionality as well over time. Check with the author of the Kafka Python client you are using (there are several) to see if and when KIP-4 admin protocol support will be in the API

- 8,024
- 1
- 24
- 31