3

I'm trying to get a list of consumer groups subscribed to a topic in Kafka using segmentio/kakfa-go. This can be done using a script provided by kafka:

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                    HOST            CLIENT-ID
  topic3          0          241019          395308          154289          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
  topic2          1          520678          803288          282610          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
  topic3          1          241018          398817          157799          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
  topic1          0          854144          855809          1665            consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1
  topic2          0          460537          803290          342753          consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1
  topic3          2          243655          398812          155157          consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4

but I need to do this programatically. Is there a way to do it? I've tried the following with no success:

conn, err := kafka2.Dial("tcp", "localhost:9092")
    if err != nil {
        panic(err.Error())
    }
    defer conn.Close()

    partitions, err := conn.ReadPartitions("topic.test")
    if err != nil {
        panic(err.Error())
    }

    m := map[string]struct{}{}

    for _, p := range partitions {
        m[p.Topic] = struct{}{}

    }
    for k := range m {
        fmt.Println(k)
    }

    client := kafka2.Client{
        Addr:      kafka2.TCP(kafkaBroker),
        Timeout:   0,
        Transport: nil,
    }

    metReq := kafka2.MetadataRequest{
        Addr:   kafka2.TCP(kafkaBroker),
        Topics: []string{"topic.test"},
    }

    tmp, _ := client.Metadata(context.Background(), &metReq)

I've seen there an Kafka proxy API but I believe this is only available to the confluent implementation of Kafka, is that right?

thanks

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
HerbertRedford
  • 230
  • 4
  • 14
  • You're not doing anything with that tmp variable, so you tell us? Note that you need a consumer group id to get this information, and you're not using it... The Kafka REST Proxy is an open source tool and works for any Kafka cluster. It's just distributed by Confluent – OneCricketeer Jul 26 '23 at 14:57
  • thanks. Yeah, I was trying to retrieve the needed info from `tmp` but it's not available, that's why I didn't add anything in code. You're saying that I need a consumer ID to get this info, but that's the exact point from the question, I just want to know what consumer IDs are "subscribed" to the topic. Ie: Let's say I just know the Topic ID, no other data is known, and I want to know whether there's a consumer group already subscribed to that topic Id – HerbertRedford Jul 26 '23 at 15:38
  • @OneCricketeer so I should be able to perform REST requests against Kafka REST proxy independently of the Kafka package I'm using in Go, right? If I were to perform a call from a REST client it should work, right? – HerbertRedford Jul 26 '23 at 15:40
  • I haven't used the REST Proxy for this exact purpose, but with only a topic name, Kafka has no API to find all groups it is a part of. The workaround I've seen in the past is to use `kafka-consumer-groups --list`, then iterate all those, then `--describe` each one and filter – OneCricketeer Jul 26 '23 at 16:12
  • ok, but this approach you're talking about is using the kafka provided shell script, right? I need to do the same but programatically. – HerbertRedford Jul 26 '23 at 16:42
  • 1
    Those scripts wrap Java code with programmatic APIs. You can also call and parse shell scripts from Golang without external libraries – OneCricketeer Jul 26 '23 at 17:00

0 Answers0