1

I am using kafka-node module to send message to kafka. In a clustered environment where I have a topic with 3 partitions and replication factor as 3.

Topic describe is -

Topic:clusterTopic      PartitionCount:3        ReplicationFactor:3    Configs:min.insync.replicas=2,segment.bytes=1073741824
        Topic: clusterTopic     Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        Topic: clusterTopic     Partition: 1    Leader: 2       Replicas: 2,3,1 Isr: 1,2,3
        Topic: clusterTopic     Partition: 2    Leader: 3       Replicas: 3,1,2 Isr: 1,2,3

Producer config -

        "requireAcks": 1,
        "attributes": 2,
        "partitionerType": 2,
        "retries": 2

When I send data it follows partition-type as cyclic(2) like round-robin fashion

when I follow below steps

  • Get a HighLevelProducer instance connected to kafka:9092,kafka:9093
  • send a message
  • stop the kafka-server:9092 manually
  • try and send another message with the HighLevelProducer and send() will trigger the callback with error: TimeoutError: Request timed out after 30000ms

What I am expecting is if a partition is not accessible (as a broker is down) producer should automatically send data to next available partition but I am losing message because of exception

The exception is as follows -

  TimeoutError: Request timed out after 3000ms
    at new TimeoutError (\package\node_modules\kafka-node\lib\errors\TimeoutError.js:6:9)
    at Timeout.timeoutId._createTimeout [as _onTimeout] (\package\node_modules\kafka-node\lib\kafkaClient.js:980:14)
    at ontimeout (timers.js:424:11)
    at tryOnTimeout (timers.js:288:5)
    at listOnTimeout (timers.js:251:5)
    at Timer.processTimers (timers.js:211:10)
(node:56416) [DEP0079] DeprecationWarning: Custom inspection function on Objects via .inspect() is deprecated
  kafka-node:KafkaClient kafka-node-client reconnecting to kafka1:9092 +3s
  kafka-node:KafkaClient createBroker kafka1 9092 +1ms
  kafka-node:KafkaClient kafka-node-client reconnecting to kafka1:9092 +3s
  kafka-node:KafkaClient createBroker kafka1 9092 +0ms
Anand Jain
  • 603
  • 7
  • 20

1 Answers1

0

Please send the bootstrap servers to confirm, but what I do believe you are experiencing based on information at hand is as follows:

  • You have min.insync.replicas set to 2
  • You have acks set to 1

With these settings, the producer will send the event to the leader replica and assume the message is safe.

If it fails immediately after the send, and before the followers have caught up, you will lose the message as you are waiting for only one ack.

From a broker perspective, you are however specifying that the requirement for the topic to be available is 2 in-sync replicas. By default only in sync replicas are allowed to be elected as leaders. As the failure of the first one will cause the followers to be out of sync, your topic might be forced offline. You can verify this in your tests, it's assuming some settings.

To rectify, try the following:

  1. If high availability is most important, set min.insync.replicas to 1 and acks to 1
  2. If data loss is not acceptable, set min.insync.replicas to 2 and acks to all

You can also set unclean.leader.election.enable to true for high availability, as this will allow an out of sync replica to be elected leader, but then there is the chance of data loss.

Charl
  • 982
  • 6
  • 12
  • Hey, I don't think the problem is with the ack and insync replicas (I tried what you suggested still same error on producer) but it's around partitionerType which is cyclic. So the data is sent to partition 1,2, and then 3 but if I stop a broker where any of one partition is present the module tries to send it to same. I would like it to discover the next available partition/broker and resend data – Anand Jain Oct 04 '19 at 05:39
  • Did you try setting unclean.leader.election.enable to true? If the producer sends a request to a dead broker, it will fail and subsequently try another broker, but only if there is a leader for that partition. Looking at your metrics from a broker perspective, are you seeing any offline partitions when this happens? – Charl Oct 04 '19 at 09:47
  • I tried setting unclean.leader.election.enable to true, I am still getting the connection error for the broker I shutdown. It retries 5 times and then stops with KafkaJSNumberOfRetriesExceeded exception. previous error is Connection error: connect ECONNREFUSED to the broker – Anand Jain Oct 04 '19 at 11:00
  • Could you set this in your producer config and send the log: logLevel: logLevel.DEBUG – Charl Oct 04 '19 at 12:11
  • Updated the question with the exception – Anand Jain Oct 04 '19 at 15:01