We have multiple microservices written in GoLang exchanging messages over the Kafka message bus. A microservice writes on a Kafka topic with a partition count of 3 with a replica factor of 2. We use AWS MSK for kafka brooker. We are using the Shopify Kafka client to connect with brokers.
Here is my Producer code -
package kf
import (
"fmt"
"github.com/Shopify/sarama"
"github.com/segmentio/kafka-go"
"net"
"strconv"
)
type Producer struct {
flowEventProducer sarama.SyncProducer
topic string
}
func InitProducer(brokers []string, topic string) *Producer {
CreateKafkaTopic(brokers[0], topic)
p := &Producer{}
prod, err := newFlowWriter(brokers)
if err != nil {
panic("failed to connect to producer")
}
p.flowEventProducer = prod
p.topic = topic
return p
}
func CreateKafkaTopic(kafkaURL, topic string) {
conn, err := kafka.Dial("tcp", kafkaURL)
if err != nil {
panic(err.Error())
}
controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}
var controllerConn *kafka.Conn
controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
defer controllerConn.Close()
topicConfigs := []kafka.TopicConfig{
{
Topic: topic,
NumPartitions: 3,
ReplicationFactor: 2,
},
}
err = controllerConn.CreateTopics(topicConfigs...)
if err != nil {
panic(err.Error())
}
defer conn.Close()
}
func newFlowWriter(brokers []string) (sarama.SyncProducer, error) {
config := sarama.NewConfig()
version := "2.6.2"
kafkaVer, err := sarama.ParseKafkaVersion(version)
if err != nil {
panic("failed to parse kafka version, producer will not run")
}
config.Producer.Partitioner = sarama.NewHashPartitioner
config.Net.MaxOpenRequests = 10
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Return.Successes = true
config.Version = kafkaVer
producer, err := sarama.NewSyncProducer(brokers, config)
return producer, err
}
func (p *Producer) WriteMessage(uuid string, data []byte) error {
msg := &sarama.ProducerMessage{
Topic: p.topic,
Key: sarama.ByteEncoder(uuid),
Value: sarama.ByteEncoder(data),
}
part, off, err := p.flowEventProducer.SendMessage(msg)
if err != nil {
return err
} else {
fmt.Printf("message wriiten on part:%d and offset: %d", part, off)
}
return nil
}
Here is my consumer -
package kf
import (
"context"
"encoding/json"
"fmt"
"github.com/Shopify/sarama"
)
type Consumer struct {
flowEventReader sarama.ConsumerGroup
topic string
brokerUrls []string
}
type data struct {
Name string `json:"name"`
Employee string `json:"employee"`
}
func InitConsumer(brokers []string, topic string) *Consumer {
c := &Consumer{}
c.topic = topic
c.brokerUrls = brokers
var (
err error
)
conf := createSaramaKafkaConf()
c.flowEventReader, err = sarama.NewConsumerGroup(c.brokerUrls, "myconf", conf)
if err != nil {
panic("failed to create consumer group on kafka cluster")
}
return c
}
type KafkaConsumerGroupHandler struct {
Cons *Consumer
}
func (c *Consumer) HandleMessages() {
// Consume from kafka and process
for {
var err = c.flowEventReader.Consume(context.Background(), []string{c.topic}, &KafkaConsumerGroupHandler{Cons: c})
if err != nil {
fmt.Println("FAILED")
continue
}
}
}
func (*KafkaConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (*KafkaConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (l *KafkaConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
l.Cons.logMessage(msg)
sess.MarkMessage(msg, "")
}
return nil
}
func (c *Consumer) logMessage(msg *sarama.ConsumerMessage) {
d := &data{}
err := json.Unmarshal(msg.Value, d)
if err != nil {
fmt.Println(err)
}
fmt.Printf("messages: key: %s and val:%+v", string(msg.Key), d)
}
func createSaramaKafkaConf() *sarama.Config {
conf := sarama.NewConfig()
version := "2.6.2"
kafkaVer, err := sarama.ParseKafkaVersion(version)
if err != nil {
panic("failed to parse kafka version, executor will not run")
}
conf.Version = kafkaVer
conf.Consumer.Offsets.Initial = sarama.OffsetOldest
conf.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRoundRobin}
return conf
}
If we put load in microservices and producer starts producing messages of order of 500 events with each having size of ~1kb. we are encountering a delay of 30 seconds in message delivery. We want instant message delivery post-production. I think Kafka is very much capable of for my use-case. Please, help me in figuring out the issue for this delay.