8

The Situation

I am using kafkajs to write to some dynamically generated kafka topics.

I am finding writing to those topics immediately after registering my producer will regularly cause an error: There is no leader for this topic-partition as we are in the middle of a leadership election.

The full error is:

{"level":"ERROR","timestamp":"2020-08-24T17:48:40.201Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 5)","broker":"localhost:9092","clientId":"tv-kitchen","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":1,"size":146}

The Code

Here is the code that is causing the problem:

import kafka from 'myConfiguredKafkaJs'

const run = async () => {
  const producer = kafka.producer()
  await producer.connect()
  producer.send({
    topic: 'myRandomTopicString',
    messages: [{
      value: 'yolo',
    }],
  })
}

run()

The Question

Two questions:

  1. Is there anything special I should be doing when connecting to the producer (or sending) in order to ensure that logic blocks until the producer is truly ready to send data to a kafka topic?
  2. Is there anything special I should be doing when sending data to the producer in order to ensure that messages are not dropped?
slifty
  • 13,062
  • 13
  • 71
  • 109

1 Answers1

7

The Solution

Kafkajs offers a createTopics method through the admin client which has an optional waitForLeaders flag:

admin.createTopics({
  waitForLeaders: true,
  topics: [
    { topic: 'myRandomTopicString123' },
  ],
}

Using this resolves the problem.

import kafka from 'myConfiguredKafkaJs'

const run = async () => {
  const producer = kafka.producer()
  const admin = kafka.admin()
  await admin.connect()
  await producer.connect()
  await admin.createTopics({
    waitForLeaders: true,
    topics: [
      { topic: 'myRandomTopicString123' },
    ],
  })
  producer.send({
    topic: 'myRandomTopicString',
    messages: [{
      value: 'yolo',
    }],
  })
}

run()

Unfortunately this will result in a different error if the topic already existed, but that's a separate question and I suspect that error is more informational than breaking.

{"level":"ERROR","timestamp":"2020-08-24T18:19:48.465Z","logger":"kafkajs","message":"[Connection] Response CreateTopics(key: 19, version: 2)","broker":"localhost:9092","clientId":"tv-kitchen","error":"Topic with this name already exists","correlationId":2,"size":86}

EDIT: the above settings do require that your Kafka instance is properly configured. It is possible to have leadership elections never resolve, in which case KafkaJS will still complain about leadership elections!

In my experience this has been due to situations where a kafka broker was stopped without being de-registered from zookeeper, meaning zookeeper is waiting for a response from something that no longer exists.

slifty
  • 13,062
  • 13
  • 71
  • 109