1

Application Functionality : Consumes messages from kafka queue and puts in buffered channel . From buffered channel, go function reads from that channel and do required message processing. if this message processing fails , it will publish back to the same kafka queue where message is consumed.

Problem : When i keep load to test this application, sometimes it is crashing with below error when producer is about to close / closing .

fatal error: unexpected signal during runtime execution
[signal SIGSEGV: segmentation violation code=0x1 addr=0x7f518cca5280 pc=0xbe690a]

runtime stack:
runtime.throw({0x1104c3f?, 0x1000?})
    /usr/local/go/src/runtime/panic.go:992 +0x71
runtime.sigpanic()
    /usr/local/go/src/runtime/signal_unix.go:802 +0x3a9

goroutine 7620665 [syscall]:
runtime.cgocall(0xb37af0, 0xc00426fb00)
    /usr/local/go/src/runtime/cgocall.go:157 +0x5c fp=0xc00426fad8 sp=0xc00426faa0 pc=0x46cc7c
github.com/confluentinc/confluent-kafka-go/kafka._Cfunc_do_produce(0x7f518cca4930, 0x7f518d6b8ea0, 0xffffffff, 0x2, 0x0, 0xc00bfd2000, 0xff5, 0x1, 0xc0053c85ec, 0x0, ...)
    _cgo_gotypes.go:773 +0x4f fp=0xc00426fb00 sp=0xc00426fad8 pc=0x94bcef
github.com/confluentinc/confluent-kafka-go/kafka.(*Producer).produce.func2(0x2?, 0x7f518d6b8ea0, 0x1346248?, 0x0, 0x1d3?, 0xc00426fc98, 0xc0226741c8?, 0x12946a80?, 0xc00426fcc8, 0x0, ...)
    /go/pkg/mod/github.com/confluentinc/confluent-kafka-go@v1.9.1/kafka/producer.go:263 +0x248 fp=0xc00426fbb8 sp=0xc00426fb00 pc=0x962de8
github.com/confluentinc/confluent-kafka-go/kafka.(*Producer).produce(0xc00d684580, 0xc00426fec0, 0x4cd07b?, 0xc0100970e0)
    /go/pkg/mod/github.com/confluentinc/confluent-kafka-go@v1.9.1/kafka/producer.go:263 +0x50e fp=0xc00426fd90 sp=0xc00426fbb8 pc=0x9628ce
github.com/confluentinc/confluent-kafka-go/kafka.(*Producer).Produce(...)
    /go/pkg/mod/github.com/confluentinc/confluent-kafka-go@v1.9.1/kafka/producer.go:285
server/queue.PublishToOutQ({0xc00bfd2000, 0xff5, 0xff5}, {0xc012a28d40, 0x8}, 0x0?, {0xc012908f60, 0x14}, {0x10e221f, 0x2}, ...)
    /tmp/test-server/queue/producer.go:256 +0x211 fp=0xc00426ff78 sp=0xc00426fd90 pc=0xb168b1
server/queue.SendMsg.func3()
    /tmp/test-server/queue/consumer.go:132 +0x57 fp=0xc00426ffe0 sp=0xc00426ff78 pc=0xb0aeb7
runtime.goexit()
    /usr/local/go/src/runtime/asm_amd64.s:1571 +0x1 fp=0xc00426ffe8 sp=0xc00426ffe0 pc=0x4d1d21
created by server/queue.SendMsg
    /tmp/test-server/queue/consumer.go:132 +0xb4f

**Go Version : **

1.18

**Confluent Kakfa Version : **

github.com/confluentinc/confluent-kafka-go v1.9.1

kafka consumer configurations :

"consumer": {
    "queued.max.messages.kbytes.int": "1000000",
    "fetch.message.max.bytes.int": "1048576",
    "receive.message.max.bytes.int": "100000000",
    "max.partition.fetch.bytes.int": "1048576",
    "auto.offset.reset.string": "latest",
    "enable.auto.commit.boolean": "true",
    "auto.commit.interval.ms.int": "5000",
    "socket.keepalive.enable.boolean": "true",
    "metadata.max.age.ms.int": "900000",
    "session.timeout.ms.int": "30000",
    "heartbeat.interval.ms.int": "3000",
    "max.poll.interval.ms.int": "60000",
    "metadata.request.timeout.ms.int": "30000",
    "topic.metadata.refresh.interval.ms.int": "120000",
    "socket.timeout.ms.int": "10000",
    "fetch.wait.max.ms.int": "100",
    "statistics.interval.ms.int": "25000"
}

Kafka producer configuration :

"producer": {
    "socket.keepalive.enable.boolean": "true",
    "metadata.max.age.ms.int": "900000",
    "metadata.request.timeout.ms.int": "30000",
    "topic.metadata.refresh.interval.ms.int": "120000",
    "retries.int": "1000000",
    "message.timeout.ms.int": "300000",
    "socket.timeout.ms.int": "10000",
    "max.in.flight.requests.per.connection.int": "5",
    "enable.idempotence.boolean": "true",
    "statistics.interval.ms.int": "25000"
}

Can you please help me on fixing this issue. Thanks in advance.

Kafka publish to happen without errors but it is failing with segmentation violation error.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Vani Polnedi
  • 595
  • 2
  • 4
  • 19
  • Show your code as a [mcve]. Do not produce the record back to the same topic. Use a separate dead-letter topic. Otherwise you will end up in an endless retry loop. – OneCricketeer Dec 07 '22 at 21:56
  • @OneCricketeer can we use dead-letter topic for consuming the failed messages because i need to process the same messages again – Vani Polnedi Dec 08 '22 at 12:11
  • You can. It's just another topic. You can subscribe a consumer to multiple topics at a time and check which topic any consumed record came from while processing. But again, don't produce back into the dead-letter topic, just don't commit the offsets and seek the consumer backwards, and let the consumer poll the data again. In other words, consuming doesn't remove the data, so don't duplicate failed processed records on the topic. – OneCricketeer Dec 08 '22 at 17:04
  • @OneCricketeer Thanks for swift response. We kept "enable.auto.commit.boolean": "true" . So, is there any way to uncommit particular offset value so that consumer consume the failed messages again and also can you please provide sample on dead-letter topic implementation in GoLang as i'm new to this concept & i'm unable find samples related it. – Vani Polnedi Dec 09 '22 at 05:13
  • You can `Seek` backwards to a particular offset/timestamp and `Commit` to "undo" any commit. Regarding "dead-letter", that is just a pattern, not any specific tooling is needed. You call producer send function to some other `"retry-topic"`. Then you run a secondary consumer to read from that topic instead, then let the "main consumer" continue on its way, and ignore having to deal with "retries" beyond sending the consumed record to that other topic – OneCricketeer Dec 09 '22 at 20:56

0 Answers0