I am trying to connect to my local kafka. I created a topic "test_producer" and created a producer for it using:
bin/kafka-console-producer.sh --topic test_producer --bootstrap-server localhost:9092
Now i am trying to connect this using node-kafka in my node application. Normal consumer is working and receiving messages but if i create a consumer group it is throwing request timeout exception
Consumer group code:
import { ConsumerGroup } from "kafka-node";
const options = {
kafkaHost: "127.0.0.1:9092",
batch: undefined,
ssl: true,
groupId: "ExampleGroup",
sessionTimeout: 15000,
protocol: ["roundrobin"],
encoding: "utf8",
fromOffset: "latest", // default
commitOffsetsOnFirstJoin: true,
outOfRangeOffset: "earliest",
onRebalance: (isAlreadyMember, callback) => {
callback();
},
};
const consumerGroup = new ConsumerGroup(options, ["test_producer"]);
consumerGroup.on("message", (message) => {
console.log(`group:${message}`);
});
But if i create consumer it is able to connect and receive messages.
Edit:
config/server.properties
############################# Server Basics #############################
broker.id=0
############################# Socket Server Settings #############################
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############################# Log Basics #############################
log.dirs=/home/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
#log.flush.interval.messages=10000
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
delete.topic.enable = true
advertised.listeners=PLAINTEXT://127.0.0.1:9092
listeners = PLAINTEXT://127.0.0.1:9092