0

I am a bit newbie working with kafka stream but what I have noticed is a behave I am not expecting. I have developed an app which is consuming from 6 topics. My goal is to group (or join) an event on every topic by an internal field. That is working fine. But my issue is with window time, it looks like the end time of every cycle affect to all the aggregations are taking on that time. Is only one timer for all aggregation are taking at the same time ?. I was expecting that just when the stream get the 30 seconds configured get out of the aggregation process. I think it is possible because I have seen data on Windowed windowedRegion variable and the windowedRegion.window().start() and windowedRegion.window().end() values are different per every stream. This is my code:

streamsBuilder
   .stream(topicList, Consumed.with(Serdes.String(), Serdes.String()))
   .groupBy(new MyGroupByKeyValueMapper(), Serialized.with(Serdes.String(), Serdes.String()))
   .windowedBy(SessionWindows.with(windowInactivity).until(windowDuration))
   .aggregate(
      new MyInitializer(),
      new MyAggregator(),
      new MyMerger(),
      Materialized.with(new Serdes.StringSerde(), new PaymentListSerde())
  )
  .mapValues(
    new MyMapper()
  )
  .toStream(new MyKeyValueMapper())
  .to(consolidationTopic,Produced.with(Serdes.String(), Serdes.String()));
JosiDiez
  • 67
  • 1
  • 12

1 Answers1

2

I'm not sure if this is what you're asking but every aggregation (every per-key session window) may indeed be updated multiple times. You will not generally get just one message per window with the final result for that session window on your "consolidation" topic. This is explained in more detail here: https://stackoverflow.com/a/38945277/7897191

Michal Borowiecki
  • 4,244
  • 1
  • 11
  • 18