I am new to apache kafka. I am trying to implement word count example usig kafka streaming with node.js. I am using https://www.npmjs.com/package/kafka-streams library. My project setup is as below:
I am going to produce 1,00,000 messages on a topic using producer-pref-test provided by apache kafka. I will create a consumer who is going to consume this stream of messages created on topic. I want to perform window operation. I'll send the word count result of 10 second (window time) to some other topic or just print to console.
I have checked the examples given at https://github.com/nodefluent/kafka-streams/tree/master/examples but, window.js example is not working and giving some error.
I have tried to implement word count and window operation by my own, but its not sending me the aggregated count value in window.
My code :
const {KafkaStreams} = require("kafka-streams");
const config = {
//zkConStr: "localhost:2181/",
kafkaHost: "localhost:9092",
groupId: "kafka-streams-test",
clientName: "kafka-streams-test-name",
workerPerPartition: 1,
options: {
sessionTimeout: 8000,
protocol: ["roundrobin"],
fromOffset: "earliest", //latest
fetchMaxBytes: 1024 * 100,
fetchMinBytes: 1,
fetchMaxWaitMs: 10,
heartbeatInterval: 250,
retryMinTimeout: 250,
autoCommit: true,
autoCommitIntervalMs: 1000,
requireAcks: 0,
//ackTimeoutMs: 100,
//partitionerType: 3
}
};
const kafkaStreams = new KafkaStreams(config);
const consumeStream = kafkaStreams.getKStream("loadTopic6");
const windowPeriod = 10 * 1000; // 10 seconds
const from = Date.now();
const to = Date.now() + windowPeriod;
const {stream, abort} = consumeStream.window(from, to);
stream
.map(keyValueMapperEtl)
.countByKey("key", "count")
.map(kv => kv.key + " " + kv.count)
.tap(kv => console.log(kv))
.to("output-topic");
//start the stream
consumeStream.start();
//setTimeout(abort,50000000000);
function keyValueMapperEtl(message){
return {
key: message.value,
value: undefined // not required
};
}
I have done some modification to the demos provided but it is still not working.
Please help me what wrong I am doing and what am I missing.
Thanks