0

I have the following logical flow in kafka streams:

stream.map((nullKey, rawData) -> KeyValue.pair(somekey(rawData), rawData))
            .filter((k, v) -> somefilterning(v))
            .groupByKey()
            .count(TimeWindows.of(3600).until(TimeUnit.SECONDS.toMillis(7200)), "someStateStore")
            .foreach((k, v) -> print(k.window().start()));

The parameters passed to the count method are the same as shown above and thus, it should aggregate the keys in a (logical) hourly window, i.e. all keys arrived between 11:00 and 12:00 should be aggregated together.

To validate that, I'm printing the window start time (or at least this is what I expect it to be) in the foreach call. My COMMIT_INTERVAL_MS_CONFIG is set to 2 min, so count outputs should be flushed every 2min and I expected the key.window.start() to be constant between flushes (assuming of course flushes of same logical hours).

Instead, I see many different start() values:

     timestamp    time
0   1508068706  11:58:26
1   1508068713  11:58:33
2   1508068720  11:58:40
3   1508068728  11:58:48
4   1508068735  11:58:55
5   1508068742  11:59:02

Which is very different than the logical hour (11:00). It is also not related to the 2min commit interval, as you can see timestamps with 8 sec difference.

idoda
  • 6,248
  • 10
  • 39
  • 52

1 Answers1

1

There are multiple parts to this answer:

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • thanks. good catch about the ms vs sec, but also about the cach size. I guess changing the ms to sec will not help unless I will also increase the cache size. – idoda Oct 15 '17 at 16:26