5

I am trying to create a function that checks if a topic exists in my AWS MSK

func KafkaTopicExist(kafkabroker string, topic string) bool {
conn, err := kafka.Dial("tcp", kafkabroker)
if err != nil {
    log.WithError(err).Warn("Kafka broker connection error")
}
defer conn.Close()

partitions, err := conn.ReadPartitions()
if err != nil {
    log.WithError(err).Warn("Can not get all partitions to obtain ")
}

m := map[string]struct{}{}

for _, p := range partitions {
    m[p.Topic] = struct{}{}
}
_, exists := m[topic]
return exists

} kafkabroker is a string that takes MSK TLS private endpoint. I get a connection error. while using PLAINTEXT endpoint works. am I missing something is the config?

MBA
  • 353
  • 2
  • 11

0 Answers0