4

I'm using the confluent-kafka-python package to interface with a Kafka server. I can successfully create the topic and push events to it. However, my problem lies when I spin up multiple nodes (running in Docker), if a second instance also tries to create the topic I get an error. I need to first check if the topic already exists before creating the new topic.

from confluent_kafka.admin import AdminClient, NewTopic
kafka_admin = AdminClient({"bootstrap.servers": server})

# First check here if the topic already exists!
if not topic_exists(topic):  # <-- how to accomplish this?
    new_kafka_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
    results = kafka_admin.create_topics([new_kafka_topic])

Thanks for any help!

nalyd88
  • 4,940
  • 8
  • 35
  • 51

2 Answers2

3

I had the same problem and I managed it in the following way:

client = AdminClient({"bootstrap.servers": BROKER_URL})
topic_metadata = client.list_topics()
if topic_metadata.topics.get(self.topic_name) is None:
  self.create_topic()
Luka Klarić
  • 323
  • 2
  • 16
0

The AdminClient class's list_topics method enables passing the topic name you want to check for, so you do not need to read the full (and potentially large) list of existing topics:

list_topics([topic=None][, timeout=-1])

Documentation: https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#id0

Danny Varod
  • 17,324
  • 5
  • 69
  • 111