0

Let's just hypothetically say groupby function was not available in kafka streams. Can I do the below to get the word count and build a KTable on top of it? Please note that i use "word-count-topic" twice in the topology. I have a use case where I want to build something iteratively and for the next stream event, I want to look up previous value and update it based on the event. I want to keep the latest value in the same topic on which I build Ktable.

KTable<String,Long> wordCountTable = builder.table("word-count-topic",Consumed.with(Serdes.String(), Serdes.Long()));

KStream<String,String> wordsStream = builder.stream("words-topic",Consumed.with(Serdes.String(), Serdes.String()));

KStream<String,String> msgStream = wordsStream
                                   .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
                                   .selectKey((k,v) -> v);

msgStream.leftJoin(kTable, (word,count) -> {
                                             if( count == null) return new WordCount(word, Long.valueOf(1));
                                             else return new WordCount(word, count + 1);
                                           })
            .mapValues((k,v)-> v.getCount())
            .to("word-count-topic", Produced.with(Serdes.String(), Serdes.Long()));

streams = new KafkaStreams(builder.build(), props);
streams.start();
  • Does this answer your question? [Kafka Streams API: KStream to KTable](https://stackoverflow.com/questions/42937057/kafka-streams-api-kstream-to-ktable) – Bartosz Wardziński Apr 23 '20 at 15:04
  • No it doesn't. "word-count-topic" initially is empty and over a period of time, for a key, it has the latest update. But, the update process is part of the same topology which computes based on stream event from "words-topic"and puts in back to "word-count-topic" and updates the prev value in Ktable. Can I do this? – Sarthak Sarathi Apr 23 '20 at 18:17

1 Answers1

0

That should work. Why not just run the code?

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137