5

Right now, i want to implement node-rdkafka into our service, but i faced this error many times Broker: Unknown member. The same issue on github was https://github.com/confluentinc/confluent-kafka-dotnet/issues/1464. they say our consumer using same group id to retry or delay. but i didn't find any retry and delay on my code. or https://github.com/confluentinc/confluent-kafka-python/issues/1004, but i have recheck all consumer group id and it was unique.

The config of node-rdkafka producer as follows:

      this.producer = new Producer({
        "client.id": this.cliendID,
        "metadata.broker.list": this.brokerList,
        'compression.codec': "lz4",
        'retry.backoff.ms': 200,
        'socket.keepalive.enable': true,
        'queue.buffering.max.messages': 100000,
        'queue.buffering.max.ms': 1000,
        'batch.num.messages': 1000000,
        "transaction.timeout.ms": 2000,
        "enable.idempotence": false,
        "max.in.flight.requests.per.connection": 1,
        "debug": this.debug,
        'dr_cb': true,
        "retries": 0,
        "log_cb": (_: any) => console.log(`log_cb =>`, _),
        "sasl.username": this.saslUsername,
        "sasl.password": this.saslPassword,
        "sasl.mechanism": this.saslMechanism,
        "security.protocol": this.securityProtocol
      }, {
        "acks": -1
      })

The config of node-rdkafka consumer as follows:

this.consumer = new KafkaConsumer({
        'group.id': this.groupID,
        'metadata.broker.list': this.brokerList,
        "sasl.username": this.saslUsername,
        "sasl.password": this.saslPassword,
        "enable.auto.commit": false,
        "auto.commit.interval.ms": 2000,
        "session.timeout.ms": 45000,
        "max.poll.interval.ms": 300000,
        "heartbeat.interval.ms": 3000,
        "api.version.request.timeout.ms": 10000,
        "max.in.flight.requests.per.connection": 1,
        "debug": this.debug,
        "sasl.mechanism": this.saslMechanism,
        "log.connection.close": true,
        "log.queue": true,
        "log_level": 7,
        "log.thread.name": true,
        "isolation.level": "read_committed",
        "ssl.ca.location": "/etc/ssl/certs/",
        "log_cb": (_: any) => console.log(`log_cb =>`, _),
        "security.protocol": this.securityProtocol
      }, {})

      await new Promise(resolve => {

        this.consumer?.connect()
        this.consumer?.on('ready', () => {
          try {
            this.consumer?.subscribe(subscriptions)
            this.consumer?.consume()
            console.log('[SUCCESS] Subscribe Event => all event')
          } catch (err) {
            console.log('[FAILED] Subscribe => all event')
            console.log(err)
          }

          resolve(this.consumer)
        }).on('data', async (data) => {

          this.topicFunctionMap[data.topic]({
            partition: data.partition,
            topic: data.topic,
            message: {
              key: data.key,
              offset: data.offset.toString(),
              size: data.size,
              value: data.value,
              timestamp: data.timestamp?.toString()
            }
          } as ISubsCallbackParam)

          this.consumer?.commitSync({
            topic: data.topic,
            offset: data.offset,
            partition: data.partition
          })


        })
      })

Using those configuration, the consumer is able to receive event but its not last for long. around 2hours more it randomly gives those error. I am not sure if it because manual commit or our function tooks long because i have tried both async and sync commit so the commitSync its not depend on our function.

Let says it because the our function tooks long, and it make our cosumer kicked from the group. maybe its the suspect after i found additional error Broker: Specified group generation id is not valid

source: https://github.com/confluentinc/confluent-kafka-dotnet/issues/1155

Its says i need to increase the session time out, then i tried to increase it to "session.timeout.ms": 300000 or 5min, and the heartbeat "heartbeat.interval.ms":3000, i found in github issue, that the heartbeat should less than = (timeout/3). so i think 3sec will fine.

Using "session.timeout.ms": 300000 and "heartbeat.interval.ms":3000 the consumer is able to consume and last for long but the problems is:

  • first time using those config, its fine running around 0-2sec to receive
  • after a while, its received, but tooks 1-10sec to receive the message

The detail errors:

received event => onCustomerServiceRegister
[COMMIT_ERR]  LibrdKafkaError: Broker: Unknown member
    at Function.createLibrdkafkaError [as create] (/src/app/node_modules/node-rdkafka/lib/error.js:454:10)
    at KafkaConsumer.Client._errorWrap (/src/app/node_modules/node-rdkafka/lib/client.js:481:29)
    at KafkaConsumer.commitSync (/src/app/node_modules/node-rdkafka/lib/kafka-consumer.js:560:8)
    at KafkaRDConnect.<anonymous> (/src/app/dist/events/connectors/kafkaRD.js:240:110)
    at step (/src/app/dist/events/connectors/kafkaRD.js:53:23)
    at Object.next (/src/app/dist/events/connectors/kafkaRD.js:34:53)
    at /src/app/dist/events/connectors/kafkaRD.js:28:71
    at new Promise (<anonymous>)
    at __awaiter (/src/app/dist/events/connectors/kafkaRD.js:24:12)
    at KafkaConsumer.<anonymous> (/src/app/dist/events/connectors/kafkaRD.js:213:72)
    at KafkaConsumer.emit (node:events:376:20)
    at KafkaConsumer.EventEmitter.emit (node:domain:470:12)
    at /src/app/node_modules/node-rdkafka/lib/kafka-consumer.js:488:12 {

Ibid Athoillah
  • 95
  • 2
  • 10

0 Answers0