I run and setuping lots of projects related to Kafka but I am facing lots of issues and errors. I created a producer-consumer repo. but it didn't work properly. I will make one application.in that web-app I will post from one account to another account using Kafka.
how to connect with the Kafka cluster. I found many solutions but I can't get any. I have created a broker. but I don't know where it works.
docker-compose.yml
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
2181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
zookeeper
ports:
9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092,SSL://kafka1:29093,SSL_HOST://localhost:9093,SASL_SSL://kafka1:29094,SASL_SSL_HOST://localhost:9094
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
producer.js
const { Kafka } = require("kafkajs");
const kafka = new Kafka({
brokers: [`localhost:9092`],
clientId: "client-id",
});
const producer = kafka.producer();
var sendMessage = async () => {
await producer.connect();
console.log("connected...");
await producer.send({
topic: "test-topic",
messages: [
{ key: "key1", value: "hello world" },
{ key: "key2", value: "hey hey!" },
],
});
await producer.disconnect();
};
sendMessage();
consumer.js
const { Kafka } = require("kafkajs");
run();
async function run() {
try {
const kafka = new Kafka({
brokers: [`localhost:9092`],
clientId: "client-id",
});
const consumer = kafka.consumer({ groupId: "jainam" });
console.log("Connecting.....");
await consumer.connect();
console.log("Connected!");
await consumer.subscribe({
topic: "test-topic",
fromBeginning: true,
});
await consumer.run({
eachMessage: async (result) => {
console.log(
`RVD Msg ${result.message.value} on partition ${result.partition}`
);
},
});
} catch (ex) {
console.error(`Something bad happened ${ex}`);
} finally {
}
}
topic.js
//const Kafka = require("kafkajs").Kafka
const { Kafka } = require("kafkajs");
run();
async function run() {
try {
const kafka = new Kafka({
clientId: "myapp",
brokers: ["localhost:9092"],
});
const admin = kafka.admin();
console.log("Connecting.....");
await admin.connect();
console.log("Connected!");
//A-M, N-Z
await admin.createTopics({
topics: [
{
topic: "Users",
numPartitions: 2,
},
],
});
console.log("Created Successfully!");
await admin.disconnect();
} catch (ex) {
console.error(`Something bad happened ${ex}`);
} finally {
process.exit(0);
}
}