I have a topic in kafka as internal
. I have created the topic using the below command
/opt/kafka/bin/kafka-topics.sh
--create --zookeeper zookeeper:2181
--replication-factor 3 -partitions 6
--topic internal
I need to consume all the message in three different node servers. So I am using kafka-node module as a consumer group with different consumer name. I have created a consumer group name called group1
, group2
, group3
.
Everything working fine, I can consume all the messages in all consumers.
But when any broker is down, the consumer is not getting any message. When I list all the consumer groups, it does not showing the specific group ID.
(e.g) If nodeserver 1
is down there is no group available in broker called group1
Even if I restart the node server, it does not create any group in broker and not consuming any messages in the respective node server. But when the broker is up, and the node server is restarted, it is creating a group in broker and the node server can receive message.
consumer.js
const options = {
kafkaHost: process.env.KAFKA_HOST,
groupId: group_id, //group1 (or) group2 (or) group3
autoCommit: true,
sessionTimeout: 15000,
protocol: ['roundrobin'],
fromOffset: 'latest',
outOfRangeOffset: 'earliest',
migrateHLC: false,
migrateRolling: true,
fetchMaxBytes: 1024 * 1024 * 10,
commitOffsetsOnFirstJoin: true,
onRebalance: (isAlreadyMember, callback) => {
log.info({"ALREADY_MEMBER_isAlreadyMember": isAlreadyMember});
callback();
}
};
const consumerGroup = new ConsumerGroup(options, process.env.KAFKA_TOPIC);
// On receiving message
consumerGroup.on("message", handMessage); //handMessage is where the message has been handled
// On error receiving message
consumerGroup.on('error', function(err) {
log.debug({"type": "KAFKA_CONSUMER_ERROR", "msg": err});
});
// On error receiving message
consumerGroup.on('offsetOutOfRange', function(err) {
log.debug({"type": "KAFKA_CONSUMER_RANGE_ERROR", "msg": err});
});
UPDATE - 1
Even if I updated offsets.topic.replication.factor
as 2
or 3
, I am having the same issue. When I any broker is down, the respective node server is not consuming the message. And also when I show list of groups in broker, it shows only group2
and group3
. But the group1
is not there when the broker1
is down. Even if I restart the node consumer, the group1
is not getting created.
server.properties
broker.id=1
listeners=INSIDE://:9092,OUTSIDE://:9094
advertised.listeners=INSIDE://:9092,OUTSIDE://:9094
listener.security.protocol.map=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/kafka/kafka-logs-d3f14c9ddf0a
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
zookeeper.connection.timeout.ms=16000
group.initial.rebalance.delay.ms=0
inter.broker.listener.name=INSIDE
advertised.port=9094
port=9092
auto.create.topics.enable=false
UPDATE - 2
When the broker is down, the group coordinator is getting deleted and it is not automatically re-electing.
Can you guys tell me what I did wrong? Or is there anything else I need to update?