4

Can I get an example of creating a topic using segmentio's kafka-go?

I have tried creating a topic as below:

c, _ := kafka.Dial("tcp", "host:port")
kt := kafka.TopicConfig{Topic: "sometopic", NumPartitions: 1, ReplicationFactor: 1}
e := c.CreateTopics(kt)

But this will work only if the given host:port is Kafka Leader. If the host:port is not Kafka Leader, then I will be getting this error:

Not Controller: this is not the correct controller for this cluster*

What is the right way of passing clusters address for creating a topic?

Kafka Segmentio: github.com/segmentio/kafka-go

shmsr
  • 3,802
  • 2
  • 18
  • 29
prabhu
  • 41
  • 1
  • 2

2 Answers2

1

This is what you need:

func (c *Conn) Controller() (broker Broker, err error)
// Controller requests kafka for the current controller and returns its URL

When you open the connection in your code using Dial, you are randomly picking one of the brokers in the cluster. Hence, you may/ may not end up on the actual Kafka controller. A simple lookup for the controller and opening a new connection should be helpful.

https://pkg.go.dev/github.com/segmentio/kafka-go?tab=doc#Conn.Controller

shmsr
  • 3,802
  • 2
  • 18
  • 29
1

Like shmsr said - you need to get the Leader connection to create topics. You can do that in the following way:

conn, err := kafka.Dial("tcp", "host:port")
if err != nil {
    panic(err.Error())
}
defer conn.Close()

controller, err := conn.Controller()
if err != nil {
    panic(err.Error())
}
controllerConn, err := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
    panic(err.Error())
}
defer controllerConn.Close()

topicConfigs := []kafka.TopicConfig{{Topic: "sometopic", NumPartitions: 1, ReplicationFactor: 1}}

err = controllerConn.CreateTopics(topicConfigs...)
if err != nil {
    panic(err.Error())
}
Norbert
  • 6,026
  • 3
  • 17
  • 40