1

I am using node-fluent/kafka-streams to connect to a Kafka cluster launched using Strimzi. The quick start example does not emit any message - it only shows the message stream started, as kafka consumer is ready.. I can confirm that messages are getting published and consumed successfully using the producer and consumer pods as described in the quick start. What is missing?

const {KafkaStreams} = require("kafka-streams");
const kafkaConfig = {
  "noptions": {
      "metadata.broker.list": "localhost:9092",
      "group.id": "kafka-streams-test-native",
      "client.id": "kafka-streams-test-name-native",
      "event_cb": true,
      "compression.codec": "snappy",
      "api.version.request": true,
      "socket.keepalive.enable": true,
      "socket.blocking.max.ms": 100,
      "enable.auto.commit": false,
      "auto.commit.interval.ms": 100,
      "heartbeat.interval.ms": 250,
      "retry.backoff.ms": 250,
      "fetch.min.bytes": 100,
      "fetch.message.max.bytes": 2 * 1024 * 1024,
      "queued.min.messages": 100,
      "fetch.error.backoff.ms": 100,
      "queued.max.messages.kbytes": 50,
      "fetch.wait.max.ms": 1000,
      "queue.buffering.max.ms": 1000,
      "batch.num.messages": 10000
  },
  "tconf": {
      "auto.offset.reset": "earliest",
      "request.required.acks": 1
  },
  "batchOptions": {
      "batchSize": 5,
      "commitEveryNBatch": 1,
      "concurrency": 1,
      "commitSync": false,
      "noBatchCommits": false
  }
}

const kafkaStreams = new KafkaStreams(kafkaConfig);
kafkaStreams.on('connection', (s) => console.log("Connected."))

const kafkaTopicName = "m001_limits";
const stream = kafkaStreams.getKStream(kafkaTopicName);

stream.forEach(message => console.log(message));
stream.start().then(() => {
    console.log("stream started, as kafka consumer is ready.");
}, error => {
    console.log("streamed failed to start: " + error);
});
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Chain Head
  • 13
  • 3
  • Try `const stream = kafkaStreams.getKStream(kafkaTopicName).forEach(message => console.log(message)); stream.start();` – Rahul Sharma Sep 10 '22 at 12:17
  • 1
    Its hard to say without any errors, logs etc. But the code is connecting to `localhost:9092` which will probably not get it connected to Strimzi. If you run inside the Kubernetes cluster, you need to connect to the bootstrp address of the Kafka broker (you can get it in the `.status` section of the Kafka CR - `kubectl get kafka -o yaml`). If you try to connect from the outside of your Kubernetes cluster, you would need to setup an external listener in Strimzi. – Jakub Sep 10 '22 at 12:55
  • @Jakub I have `kubectl port-forward svc/... 9092:9092`. I am not sure, if there is a `connection` event that can be handled to display message about successful connection. At least, I do not see a connection specific error message. Other than that, there is no error message or stream message, etc. – Chain Head Sep 10 '22 at 18:07
  • @RahulSharma that didn't work - `stream.start is not a function` – Chain Head Sep 10 '22 at 18:49
  • Kafka doesn't work on a simple port forward. How did you configure the broker external listeners in Strimzi? NodeIP? Did you try using Kafka CLI tools outside of the pods first? – OneCricketeer Sep 11 '22 at 02:06
  • No further configuration was done. What are other ports need to be forwarded? No, I haven't used any other CLI tools outside of Kubernetes. – cogitoergosum Sep 11 '22 at 12:39
  • Kafka has its own discovery protocol, so it is not easy to make it work with port-forward because it would require additional configurations etc. As suggested by OneCricketeer, you need to configure the external listener in Strimzi in the `Kafka` CR. Check the Strimzi docs for the different options and mechanisms available. – Jakub Sep 11 '22 at 14:15

0 Answers0