I have the following logical flow in kafka streams:
stream.map((nullKey, rawData) -> KeyValue.pair(somekey(rawData), rawData))
.filter((k, v) -> somefilterning(v))
.groupByKey()
.count(TimeWindows.of(3600).until(TimeUnit.SECONDS.toMillis(7200)), "someStateStore")
.foreach((k, v) -> print(k.window().start()));
The parameters passed to the count method are the same as shown above and thus, it should aggregate the keys in a (logical) hourly window, i.e. all keys arrived between 11:00 and 12:00 should be aggregated together.
To validate that, I'm printing the window start time (or at least this is what I expect it to be) in the foreach call. My COMMIT_INTERVAL_MS_CONFIG
is set to 2 min, so count outputs should be flushed every 2min and I expected the key.window.start() to be constant between flushes (assuming of course flushes of same logical hours).
Instead, I see many different start() values:
timestamp time
0 1508068706 11:58:26
1 1508068713 11:58:33
2 1508068720 11:58:40
3 1508068728 11:58:48
4 1508068735 11:58:55
5 1508068742 11:59:02
Which is very different than the logical hour (11:00). It is also not related to the 2min commit interval, as you can see timestamps with 8 sec difference.