1
conf := &kafka.ConfigMap{
        "bootstrap.servers": "127.0.0.1:9092, 127.0.0.1:9093, 127.0.0.1:9094",
        "min.insync.replicas":3,
    }

producer, err := kafka.NewProducer(conf)
if err != nil {
       log.Println("Error initializing producer: ", err)
}

I have initialized kafka producer in this way using confluent.io package.

producerErr := p.Producer.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{
                Topic: &topic,
                Partition: kafka.PartitionAny,
            },
            Key:            []byte("Message"),
            Value:          msg,
        }, nil)

This is for producing a topic but, How to set replication factor for the topic. Or suggest some way set the no. of broker and replication factor for production.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • Note: You should not run 3 brokers locally. Your one host is still a single point of failure, and probably is using one disk to share the same data multiple times, so will be slower than just one broker... – OneCricketeer Apr 20 '23 at 22:36

1 Answers1

0

You need an existing topic before you can use a producer. Producers don't create topics, themselves, so therefore cannot set replication factor (or partitions, or other topic configs).

You need to use AdminClient to set replicas (and other settings) of a new topic

    // Create a new AdminClient.
    // AdminClient can also be instantiated using an existing
    // Producer or Consumer instance, see NewAdminClientFromProducer and
    // NewAdminClientFromConsumer.
    a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers})
...
    results, err := a.CreateTopics(
        ctx,
        // Multiple topics can be created simultaneously
        // by providing more TopicSpecification structs here.
        []kafka.TopicSpecification{{
            Topic:             topic,
            NumPartitions:     numParts,
            ReplicationFactor: replicationFactor}},
...

https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/admin_create_topic/admin_create_topic.go

The only way you should "add replicas" to an existing topic is to use kafka-reassign-partitions.sh included with Kafka CLI commands

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245