-1

I'm benchmarking a simple webserver written in Go using wrk. The server is running on a machine with 4GB RAM. At the beginning of the test, the performance is very good with the code serving up to 2000 requests/second. But as time goes on, memory utilized by the process creeps up and once it reaches 85% (I'm checking this using top), the throughput drops to ~100 requests/second. The throughput again increases to optimal amounts once I restart the server.

Is the degradation of performance due to memory issues? Why isn't Go releasing this memory? My Go server looks like this:

func main() {
    defer func() {
        // Wait for all messages to drain out before closing the producer
        p.Flush(1000)
        p.Close()
    }()

    http.HandleFunc("/endpoint", handler)
    log.Fatal(http.ListenAndServe(":8080", nil))
}

In the handler, I convert the incoming Protobuf message to a Json and write it to Kafka using confluent Kafka Go library.

var p, err = kafka.NewProducer(&kafka.ConfigMap{
    "bootstrap.servers": "abc-0.com:6667,abc-1.com:6667",
    "message.timeout.ms": "30000",
    "sasl.kerberos.keytab": "/opt/certs/TEST.KEYTAB",
    "sasl.kerberos.principal": "TEST@TEST.ABC.COM",
    "sasl.kerberos.service.name": "kafka",
    "security.protocol": "SASL_PLAINTEXT",
})

var topic = "test"

func handler(w http.ResponseWriter, r *http.Request) {
    body, _ := ioutil.ReadAll(r.Body)

    // Deserialize byte[] to Protobuf message
    protoMessage := &tutorial.REALTIMEGPS{}
    _ := proto.Unmarshal(body, protoMessage)

    // Convert Protobuf to Json
    realTimeJson, _ := convertProtoToJson(protoMessage)

    _, err := fmt.Fprintf(w, "")

    if err != nil {
        log.Fatal(responseErr)
    }

    // Send to Kafka
    produceMessage([]byte(realTimeJson))
}

func produceMessage(message []byte) {
    // Delivery report
    go func() {
        for e := range p.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                if ev.TopicPartition.Error != nil {
                    log.Println("Delivery failed: ", ev.TopicPartition)
                } else {
                    log.Println("Delivered message to ", ev.TopicPartition)
                }
            }
        }
    }()

    // Send message
    _ := p.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
        Value:          message,
    }, nil)
}

func convertProtoToJson(pb proto.Message) (string, error) {
    marshaler := jsonpb.Marshaler{}
    json, err := marshaler.MarshalToString(pb)
    return json, err
}
Harshith Bolar
  • 728
  • 1
  • 10
  • 29
  • 2
    We can't really help without seeing what your `handler` does. As general notes, try to reduce allocations, use pools of "big" buffers and data structures, and always close / free resources as soon as they become unneeded. – icza Sep 13 '19 at 08:17
  • @icza I've added the handler – Harshith Bolar Sep 13 '19 at 08:48
  • 4
    One thing that looks odd: in each of your request at the end you call `produceMessage()`, which sends a message to kafka, and launches a goroutine to receive events for checking errors. Why do you do this? I believe a _single_ goroutine would be sufficient to do this. – icza Sep 13 '19 at 08:52
  • I followed the recommended method provided here - https://github.com/confluentinc/confluent-kafka-go > Producing is an asynchronous operation so the client notifies the application of per-message produce success or failure through something called delivery reports. – Harshith Bolar Sep 13 '19 at 08:55
  • 5
    The page you linked launches a _single_ goroutine for delivery report purposes. You launch one for every incoming request. Don't do that. Just launch one, after your `p` variable is setup. Your code launches goroutines non-stop as incoming requests come in, and they don't end until something goes wrong with your kafka client. – icza Sep 13 '19 at 08:57
  • about defer https://stackoverflow.com/a/17888654/4466350 –  Sep 13 '19 at 09:00
  • Thank you @icza, that fixed the issue. – Harshith Bolar Sep 13 '19 at 09:35

1 Answers1

5

The problem is that at the end of each of your request you call produceMessage(), which sends a message to kafka, and launches a goroutine to receive events for checking errors.

Your code launches goroutines non-stop as incoming requests come in, and they don't end until something goes wrong with your kafka client. This requires more and more memory, and likely more and more CPU as more goroutines are to be scheduled.

Don't do this. A single goroutine is sufficient for delivery report purposes. Launch a single goroutine when your p variable is set, and you're good to go.

For example:

var p *kafka.Producer

func init() {
    var err error
    p, err = kafka.NewProducer(&kafka.ConfigMap{
        // ...
    }
    if err != nil {
        // Handle error
    }

    // Delivery report
    go func() {
        for e := range p.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                if ev.TopicPartition.Error != nil {
                    log.Println("Delivery failed: ", ev.TopicPartition)
                } else {
                    log.Println("Delivered message to ", ev.TopicPartition)
                }
            }
        }
    }()
}
icza
  • 389,944
  • 63
  • 907
  • 827