0

I am using Kafka stream for windowed aggregation. Here is the logic:

    KStream<String, String> stream = builder.stream(topics, Consumed.with(stringSerde, stringSerde));
    Stream.groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofMinutes(2)).grace(Duration.ZERO))
    .aggregate(()-> "",
         (key, word, aggregate) -> {
           logger.info("(key, word, aggregate): ({}, {}, {})" , key, word, aggregate);
           aggregate = aggregate+ "-" + word;
           return aggregate;
         }, Materialized.with(stringSerde, stringSerde))
     .toStream((k, v) -> k.key())
     .foreach((k, v) -> logger.info("Aggregated to Stream. (k,v): ({}, {})" , k, v));

Though this works for most of the times, I observed these issues:

  1. Aggregation is prematurely flushed
  2. New aggregation bucket gets created even before the window is closed

These issues are evident by these logs (marked lines):

[2019-08-14 14:10:38,855] [ INFO] [prc-client-StreamThread-1] Aggregation (118) - (key, word, aggregate): (1, a, )

(1)[2019-08-14 14:11:24,503] [ INFO] [prc-client-StreamThread-1] Aggregation (124) - Aggregated to Stream. (k,v): (1, -a)

[2019-08-14 14:11:27,735] [ INFO] [prc-client-StreamThread-1] Aggregation (118) - (key, word, aggregate): (1, b, -a)
[2019-08-14 14:11:43,298] [ INFO] [prc-client-StreamThread-1] Aggregation (118) - (key, word, aggregate): (1, f, -a-b)
[2019-08-14 14:11:59,373] [ INFO] [prc-client-StreamThread-1] Aggregation (118) - (key, word, aggregate): (1, b, -a-b-f)

(2)[2019-08-14 14:12:14,196] [ INFO] [prc-client-StreamThread-1] Aggregation (118) - (key, word, aggregate): (1, r, )

[2019-08-14 14:13:24,808] [ INFO] [prc-client-StreamThread-1] Aggregation (124) - Aggregated to Stream. (k,v): (1, -a-b-f-b)
[2019-08-14 14:13:24,808] [ INFO] [prc-client-StreamThread-1] Aggregation (124) - Aggregated to Stream. (k,v): (1, -r)

Is there is anyway to address these issues?

Pavan
  • 711
  • 2
  • 6
  • 17
  • 1
    Is this relevant: https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables ? – Robin Moffatt Aug 14 '19 at 13:22
  • Yeah checked it.. But it has behavior as described here https://stackoverflow.com/questions/54222594/kafka-stream-suppress-session-windowed-aggregation which is not desired in our pipeline.. We need to get data as soon as window session ends.. – Pavan Aug 16 '19 at 12:35
  • It's by design and you need to insert `suppress` operator if you want a different behavior. This blog post provides more details: https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers – Matthias J. Sax Aug 20 '19 at 01:04
  • emission `suppress` works as expected. But even with suppress i am seeing that a new aggregation is being created(for same key) even when window is not closed. Could there any reason for that. Is it related to record time stamps. Please suggest.. – Pavan Aug 22 '19 at 03:45

0 Answers0