0

I run and setuping lots of projects related to Kafka but I am facing lots of issues and errors. I created a producer-consumer repo. but it didn't work properly. I will make one application.in that web-app I will post from one account to another account using Kafka.

how to connect with the Kafka cluster. I found many solutions but I can't get any. I have created a broker. but I don't know where it works.

docker-compose.yml

version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
2181:2181

kafka:
image: confluentinc/cp-kafka:latest
depends_on:
zookeeper
ports:
9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092,SSL://kafka1:29093,SSL_HOST://localhost:9093,SASL_SSL://kafka1:29094,SASL_SSL_HOST://localhost:9094
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

producer.js

const { Kafka } = require("kafkajs");

const kafka = new Kafka({
  brokers: [`localhost:9092`],
  clientId: "client-id",
});

const producer = kafka.producer();
var sendMessage = async () => {
  await producer.connect();
  console.log("connected...");
  await producer.send({
    topic: "test-topic",
    messages: [
      { key: "key1", value: "hello world" },
      { key: "key2", value: "hey hey!" },
    ],
  });
  await producer.disconnect();
};

sendMessage();

consumer.js

const { Kafka } = require("kafkajs");

run();
async function run() {
  try {
    const kafka = new Kafka({
      brokers: [`localhost:9092`],
      clientId: "client-id",
    });
    const consumer = kafka.consumer({ groupId: "jainam" });
    console.log("Connecting.....");
    await consumer.connect();
    console.log("Connected!");

    await consumer.subscribe({
      topic: "test-topic",
      fromBeginning: true,
    });

    await consumer.run({
      eachMessage: async (result) => {
        console.log(
          `RVD Msg ${result.message.value} on partition ${result.partition}`
        );
      },
    });
  } catch (ex) {
    console.error(`Something bad happened ${ex}`);
  } finally {
    
  }
}

topic.js

//const Kafka = require("kafkajs").Kafka
const { Kafka } = require("kafkajs");

run();
async function run() {
  try {
    const kafka = new Kafka({
      clientId: "myapp",
      brokers: ["localhost:9092"],
    });

    const admin = kafka.admin();
    console.log("Connecting.....");
    await admin.connect();
    console.log("Connected!");
    //A-M, N-Z
    await admin.createTopics({
      topics: [
        {
          topic: "Users",
          numPartitions: 2,
        },
      ],
    });
    console.log("Created Successfully!");
    await admin.disconnect();
  } catch (ex) {
    console.error(`Something bad happened ${ex}`);
  } finally {
    process.exit(0);
  }
}

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • For starters, are there any errors in your docker startup commands for the broker? What errors are you seeing when you run the code? – OneCricketeer Nov 19 '22 at 13:08

0 Answers0