2

I am new to apache kafka. I am trying to implement word count example usig kafka streaming with node.js. I am using https://www.npmjs.com/package/kafka-streams library. My project setup is as below:

I am going to produce 1,00,000 messages on a topic using producer-pref-test provided by apache kafka. I will create a consumer who is going to consume this stream of messages created on topic. I want to perform window operation. I'll send the word count result of 10 second (window time) to some other topic or just print to console.

I have checked the examples given at https://github.com/nodefluent/kafka-streams/tree/master/examples but, window.js example is not working and giving some error.

I have tried to implement word count and window operation by my own, but its not sending me the aggregated count value in window.

My code :

   const {KafkaStreams} = require("kafka-streams");

const config = {
    //zkConStr: "localhost:2181/",
    kafkaHost: "localhost:9092",
    groupId: "kafka-streams-test",
    clientName: "kafka-streams-test-name",
    workerPerPartition: 1,
    options: {
        sessionTimeout: 8000,
        protocol: ["roundrobin"],
        fromOffset: "earliest", //latest
        fetchMaxBytes: 1024 * 100,
        fetchMinBytes: 1,
        fetchMaxWaitMs: 10,
        heartbeatInterval: 250,
        retryMinTimeout: 250,
        autoCommit: true,
        autoCommitIntervalMs: 1000,
        requireAcks: 0,
        //ackTimeoutMs: 100,
        //partitionerType: 3
    }
};


const kafkaStreams = new KafkaStreams(config);
const consumeStream = kafkaStreams.getKStream("loadTopic6");

const windowPeriod = 10 * 1000; // 10 seconds
const from = Date.now();
const to = Date.now() + windowPeriod;

const {stream, abort} = consumeStream.window(from, to);

stream
    .map(keyValueMapperEtl)
    .countByKey("key", "count")
    .map(kv => kv.key + " " + kv.count)
    .tap(kv => console.log(kv))
    .to("output-topic");



//start the stream
consumeStream.start();



//setTimeout(abort,50000000000);

function keyValueMapperEtl(message){
    return {
        key: message.value,
        value: undefined // not required
    };
}

I have done some modification to the demos provided but it is still not working.

Please help me what wrong I am doing and what am I missing.

Thanks

tarkikshah
  • 534
  • 6
  • 16
  • What error do you get? – pedromss Aug 17 '17 at 09:17
  • After some changes to window.js example, I am not getting error. But I want to have aggregated word count after 10 sec time. But here after 10 sec, its logging all the words line by line and I am expecting the final aggregated count during window time. – tarkikshah Aug 17 '17 at 09:22
  • You mean your program doesn't terminate because you keep on consuming records? – pedromss Aug 17 '17 at 10:03
  • No. Its giving output but its like word1-1 word1-2 word1-3.. I want it to be word1-3 after 10 sec – tarkikshah Aug 17 '17 at 10:04
  • 1
    That's not how real-time streams work. You are asking for a batch (or microbatch) result which is not a true streaming result. – Hans Jespersen Aug 18 '17 at 02:40
  • So what should be my approach then ? Any suggestion.. As per my knowledge same is possible with Apache kafka with Java implementation..And if this not the true scenario then what is the use case of Kafka Stream API because data can be consumed continuously using simple Kafka as well. – tarkikshah Aug 18 '17 at 04:53
  • This question might help: https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable – Matthias J. Sax Aug 20 '17 at 22:15

0 Answers0