Following is the windowing operation I have written in Kafka Streams
KTable<Windowed<String>, Test> testWinAlerts = testRecords
.groupByKey()
.aggregate(new TestInitilizer(),
new minMaxCalculator(),
TimeWindows.of(TimeUnit.SECONDS.toMillis(5))
.advanceBy(TimeUnit.SECONDS.toMillis(1)),
MessageSerde,
"win-counts")
.filter((k,v)->{
//Some Operation
return (condition);
})
.toStream((k,v)->k.toString())
.to(Serdes.String(),MessageSerde,"Window-topic");
But in this operation, filter operation gets called each time new messege comes and updates aggregation. For every new messege alert is written into "Window-topic"
topic. Instead what I want is,
filter operation should be executed once per window and final result should be written to "Window-topic"
once per window(5,1). Is there any way we can do this and reduce these multiple calls?