My intention with this topology is to window incoming messages and then count them, and then send the count to another topic.
When I test this with a single key and one or more values to the input topic, I get inconsistent results. Sometimes the count is correct. Sometimes I will send in a single message, see the single message at the first peek
and instead of getting a count of 1, I get some other value at the second peek
and in the output topic. When I send in multiple messages, the count is usually right, but sometimes off. I'm careful to send the messages inside the time window, so I don't think they're getting split into two windows.
Is there a flaw in my topology?
public static final String INPUT_TOPIC = "test-topic";
public static final String OUTPUT_TOPIC = "test-output-topic";
public static void buildTopo(StreamsBuilder builder) {
WindowBytesStoreSupplier store = Stores.persistentTimestampedWindowStore(
"my-state-store",
Duration.ofDays(1),
Duration.ofMinutes(1),
false);
Materialized<String, Long, WindowStore<Bytes, byte[]>> materialized = Materialized
.<String, Long>as(store)
.withKeySerde(Serdes.String());
Suppressed<Windowed> suppression = Suppressed
.untilWindowCloses(Suppressed.BufferConfig.unbounded());
TimeWindows window = TimeWindows
.of(Duration.ofMinutes(1))
.grace(Duration.ofSeconds(0));
// windowedKey has a string plus the kafka time window
builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
.peek((key, value) -> System.out.println("****key = " + key + " value= " + value))
.groupByKey()
.windowedBy(window)
.count(materialized)
.suppress(suppression)
.toStream()
.peek((key, value) -> System.out.println("key = " + key + " value= " + value))
.map((key, value) -> new KeyValue<>(key.key(), value))
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
}