0

Following is the windowing operation I have written in Kafka Streams

KTable<Windowed<String>, Test> testWinAlerts = testRecords
    .groupByKey()
    .aggregate(new TestInitilizer(), 
               new minMaxCalculator(),
               TimeWindows.of(TimeUnit.SECONDS.toMillis(5))
                          .advanceBy(TimeUnit.SECONDS.toMillis(1)),
               MessageSerde,
               "win-counts")
    .filter((k,v)->{
            //Some Operation
                return (condition);
            })
    .toStream((k,v)->k.toString())
    .to(Serdes.String(),MessageSerde,"Window-topic");

But in this operation, filter operation gets called each time new messege comes and updates aggregation. For every new messege alert is written into "Window-topic" topic. Instead what I want is, filter operation should be executed once per window and final result should be written to "Window-topic" once per window(5,1). Is there any way we can do this and reduce these multiple calls?

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
rishi007bansod
  • 1,283
  • 2
  • 19
  • 45
  • 2
    Possible duplicate of [How to send final kafka-streams aggregation result of a time windowed KTable?](https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable) – Dmitry Minkovsky Dec 18 '17 at 19:15
  • 1
    Please note that in Kafka Streams 1.0.0 includes wall clock time punctuation, so although windows have no "end", you can set up a periodic check on windows that recently "ended" and respond accordingly. – Dmitry Minkovsky Dec 18 '17 at 19:18

0 Answers0