2

I've never used kafka before. I have two test Go programs accessing a local kafka instance: a reader and a writer. I'm trying to tweak my producer, consumer, and kafka server settings to get a particular behavior.

My writer:

package main

import (
    "fmt"
    "math/rand"
    "strconv"
    "time"

    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
    rand.Seed(time.Now().UnixNano())

    topics := []string{
        "policymanager-100",
        "policymanager-200",
        "policymanager-300",
    }
    progress := make(map[string]int)
    for _, t := range topics {
        progress[t] = 0
    }

    producer, err := kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost",
        "group.id":          "0",
    })
    if err != nil {
        panic(err)
    }
    defer producer.Close()

    fmt.Println("producing messages...")
    for i := 0; i < 30; i++ {
        index := rand.Intn(len(topics))
        topic := topics[index]
        num := progress[topic]
        num++
        fmt.Printf("%s => %d\n", topic, num)
        msg := &kafka.Message{
            Value: []byte(strconv.Itoa(num)),
            TopicPartition: kafka.TopicPartition{
                Topic: &topic,
            },
        }
        err = producer.Produce(msg, nil)
        if err != nil {
            panic(err)
        }
        progress[topic] = num
        time.Sleep(time.Millisecond * 100)
    }
    fmt.Println("DONE")
}

There are three topics that exist on my local kafka: policymanager-100, policymanager-200, policymanager-300. They each only have 1 partition to ensure all messages are sorted by the time kafka receives them. My writer will randomly pick one of those topics and issue a message consisting of a number that increments solely for that topic. When it's done running, I expect the queues to look something like this (topic names shortened for legibility):

100: 1 2 3 4 5 6 7 8 9 10 11
200: 1 2 3 4 5 6 7
300: 1 2 3 4 5 6 7 8 9 10 11 12

So far so good. I'm trying to configure things so that any number of consumers can be spun up and consume these messages in order. By "in-order" I mean that no consumer should get message 2 for topic 100 until message 1 is COMPLETED (not just started). If message 1 for topic 100 is being worked on, consumers are free to consume from other topics that currently don't have a message being processed. If a message of a topic has been sent to a consumer, that entire topic should become "locked" until either a timeout assumes that the consumer failed or the consumer commits the message, then the topic is "unlocked" to have it's next message made available to be consumed.

My reader:

package main

import (
    "fmt"
    "time"

    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
    count := 2
    for i := 0; i < count; i++ {
        go consumer(i + 1)
    }
    fmt.Println("cosuming...")
    // hold this thread open indefinitely
    select {}
}

func consumer(id int) {
    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers":  "localhost",
        "group.id":           "0", // strconv.Itoa(id),
        "enable.auto.commit": "false",
    })
    if err != nil {
        panic(err)
    }

    c.SubscribeTopics([]string{`^policymanager-.+$`}, nil)
    for {
        msg, err := c.ReadMessage(-1)
        if err != nil {
            panic(err)
        }

        fmt.Printf("%d) Message on %s: %s\n", id, msg.TopicPartition, string(msg.Value))
        time.Sleep(time.Second)
        _, err = c.CommitMessage(msg)
        if err != nil {
            fmt.Printf("ERROR commiting: %+v\n", err)
        }
    }
}

From my current understanding, the way I'm likely to achieve this is by setting up my consumer properly. I've tried many different variations of this program. I've tried having all my goroutines share the same consumer. I've tried using a different group.id for each goroutine. None of these was the right configuration to get the behavior I'm after.

What the posted code does is empty out one topic at a time. Despite having multiple goroutines, the process will read all of 100 then move to 200 then 300 and only one goroutine will actually do all the reading. When I let each goroutine have a different group.id then messages get read by multiple goroutines which I would like to prevent.

My example consumer is simply breaking things up with goroutines but when I begin working this project into my use case at work, I'll need this to work across multiple kubernetes instances that won't be talking to each other so using anything that interacts between goroutines won't work as soon as there are 2 instances on 2 kubes. That's why I'm hoping to make kafka do the gatekeeping I want.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Corey Ogburn
  • 24,072
  • 31
  • 113
  • 188
  • librdkafka (which the Confluent Golang client is based on) doesn't yet have exactly once semantics. You can follow https://github.com/confluentinc/confluent-kafka-go/issues/104 (and the linked issue) for more information. – OneCricketeer Jan 23 '19 at 23:09
  • It looks like Consumer Transactions is what I'm after. As long as a consumer moves to a different topic rather than get the next message after messages locked by transaction. – Corey Ogburn Jan 23 '19 at 23:23
  • Possible duplicate of https://stackoverflow.com/questions/42165726/kafka-only-once-consumption-guarantee – Raunak Jhawar Jan 24 '19 at 04:50
  • @RaunakJhawar Sort of. While I do want to process messages only once, my biggest concern right now is the ability to prevent out of order processing. If one consumer takes offset=1 but hasn't commited it yet, I don't want a different consumer to receive offset=2. I'd rather the second consumer find a different topic that doesn't have a "sent but uncommitted" message at the front of the queue. If all topics have a message being processed by a different consumer, I'd rather wait than risk doing things out of order. – Corey Ogburn Jan 24 '19 at 16:35
  • Generally speaking, you cannot. Even if you had a single consumer that consumed all the partitions for the topic, the partitions would be consumed in a non-deterministic order and your total ordering across all partitions would not be guaranteed. – Raunak Jhawar Jan 24 '19 at 19:40
  • @RaunakJhawar Are there any big problems with just having one partition per topic? That should enforce order from what I've read. I'm not expecting any topic to get very large. – Corey Ogburn Jan 24 '19 at 20:11
  • More partitions lead to higher throughput, hence a higher degree of parallelism and hence better use of cluster resources. – Raunak Jhawar Jan 24 '19 at 20:14
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/187285/discussion-between-corey-ogburn-and-raunak-jhawar). – Corey Ogburn Jan 24 '19 at 20:16

1 Answers1

1

Generally speaking, you cannot. Even if you had a single consumer that consumed all the partitions for the topic, the partitions would be consumed in a non-deterministic order and your total ordering across all partitions would not be guaranteed.

Try Keyed Messages, think you may find this of good use for your use case.

Raunak Jhawar
  • 1,541
  • 1
  • 12
  • 21