1

I'm trying to test rebalancing consumers. I've read about consumer groups and they don't work as I expected. I have a two consumers running in the same group like so:

package main

import (
    "errors"
    "net"
    "os"
    "time"

    "github.com/confluentinc/confluent-kafka-go/kafka"
    "github.com/sirupsen/logrus"
)

type Callback func(*kafka.Message) error

type CallbackMap map[string]Callback

type Consumer struct {
    consumer    *kafka.Consumer
    name        string
    callbackMap CallbackMap
}


func NewKafkaConsumer(topics []string, name string) (*Consumer, error) {

    bootstrap := os.Getenv("KAFKA_HOSTNAME")

    consumer, err := kafka.NewConsumer(
        &kafka.ConfigMap{
            "bootstrap.servers":        bootstrap,
            "group.id":                 "testing",
            "security.protocol":        "plaintext",
            "go.events.channel.enable": true,
        },
    )

    if err != nil {
        logrus.WithError(err).Fatal("failed to create consumer")
        return nil, err
    }

    if err := consumer.SubscribeTopics(topics, nil); err != nil {
        logrus.WithError(err).Fatal("failed to subscribe to topics")
        return nil, err
    }

    return &Consumer{
        consumer:    consumer,
        name:        name,
        callbackMap: make(CallbackMap),
    }, nil
}

func (c *Consumer) Subscribe(key string, callback Callback) {
    c.callbackMap[key] = callback
}

func (c *Consumer) UnSubscribe(key string) {
    delete(c.callbackMap, key)
}

func (c *Consumer) HandleMessage(message *kafka.Message, chano chan error) {
    if message.TopicPartition.Error != nil {
        logrus.WithError(message.TopicPartition.Error).Warning()
        return
    }
    logrus.Errorf("CONSUMER NAME IS =========== %s", c.name)
    logrus.Errorf("MESSAGE is ============: %s", message)
    logrus.Errorf("VALUE IS =============: %s", string(message.Value))
    logrus.Errorf("PARTITION IS =========== %d", message.TopicPartition.Partition)
    if string(message.Value) == "20" {
        logrus.Errorf("Exited consumer: %s", c.name)
        chano <- errors.New("err")
        return
    }

}

func (c *Consumer) Consume() {
    logrus.Errorf("entering consumer: %s", c.name)
    chano := make(chan error)
    for event := range c.consumer.Events() {
        select {
        case _ = <-chano:
            err := c.consumer.Close()
            if err != nil {
                logrus.Errorf("Failed to close consumer")
                return
            }
            return
        default:
            switch e := event.(type) {
            case *kafka.Message:
                c.HandleMessage(e, chano)
            case kafka.Error:
                logrus.WithError(e).Warning("an error occurred while reading from topic")

            default:
                // Ignore other event types
            }
        }
    }
}

func main() {
    kafkaConsumer1, err := NewKafkaConsumer([]string{"test_topic"}, "one")
    kafkaConsumer2, err := NewKafkaConsumer([]string{"test_topic"}, "two")
    logrus.Errorf("started")
    if err != nil {
        logrus.Errorf("err %s", err)
        return
    }

    go kafkaConsumer1.Consume()
    go kafkaConsumer2.Consume()
    select {}

}

I am producing messages with sequential values. I expect that once the value hits 20, the consumer will stop and close and rebalancing will occur, with the other consumer starting to listen. This doesn't happen - once 20 hits, all consuming stops. Why is this happening? For example, consumer one started and the log I get is:

ERRO[0078] CONSUMER NAME IS =========== one             
ERRO[0078] MESSAGE is ============: test_topic[3]@947        
ERRO[0078] VALUE IS =============: 20                   
ERRO[0078] PARTITION IS =========== 3                   
ERRO[0078] Exited consumer: one     

And that's it.

Thanks

Omri. B
  • 375
  • 1
  • 13
  • 2
    Ideally, you run the go process twice rather than put two consumer clients in the same process – OneCricketeer Jun 28 '23 at 13:06
  • @OneCricketeer yes typo, i changed some names for SO because my names are embarassing :) edited – Omri. B Jun 28 '23 at 13:22
  • @OneCricketeer why won't this work in the same process? – Omri. B Jun 28 '23 at 13:23
  • 1
    Figured it out after reading some more in the docs, this line was key: `Events with the same event key (e.g., a customer or vehicle ID) are written to the same partition`. Changing the key to random made it work in both separate processes and the same. – Omri. B Jun 28 '23 at 14:07
  • That shouldn't matter for rebalancing. If you stop one consumer, all keys in one partition should still be consumed from another. The same process might work, it's just a single point of failure for one go process to get terminated for any reason by the OS, then you'll lose both consumers rather than only one – OneCricketeer Jun 29 '23 at 15:16

0 Answers0