I am using kafka-node module to send message to kafka. In a clustered environment where I have a topic with 3 partitions and replication factor as 3.
Topic describe is -
Topic:clusterTopic PartitionCount:3 ReplicationFactor:3 Configs:min.insync.replicas=2,segment.bytes=1073741824
Topic: clusterTopic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: clusterTopic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 1,2,3
Topic: clusterTopic Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 1,2,3
Producer config -
"requireAcks": 1,
"attributes": 2,
"partitionerType": 2,
"retries": 2
When I send data it follows partition-type as cyclic(2) like round-robin fashion
when I follow below steps
- Get a HighLevelProducer instance connected to kafka:9092,kafka:9093
- send a message
- stop the kafka-server:9092 manually
- try and send another message with the HighLevelProducer and send() will trigger the callback with error: TimeoutError: Request timed out after 30000ms
What I am expecting is if a partition is not accessible (as a broker is down) producer should automatically send data to next available partition but I am losing message because of exception
The exception is as follows -
TimeoutError: Request timed out after 3000ms
at new TimeoutError (\package\node_modules\kafka-node\lib\errors\TimeoutError.js:6:9)
at Timeout.timeoutId._createTimeout [as _onTimeout] (\package\node_modules\kafka-node\lib\kafkaClient.js:980:14)
at ontimeout (timers.js:424:11)
at tryOnTimeout (timers.js:288:5)
at listOnTimeout (timers.js:251:5)
at Timer.processTimers (timers.js:211:10)
(node:56416) [DEP0079] DeprecationWarning: Custom inspection function on Objects via .inspect() is deprecated
kafka-node:KafkaClient kafka-node-client reconnecting to kafka1:9092 +3s
kafka-node:KafkaClient createBroker kafka1 9092 +1ms
kafka-node:KafkaClient kafka-node-client reconnecting to kafka1:9092 +3s
kafka-node:KafkaClient createBroker kafka1 9092 +0ms