My question is similar to: How to unit test a kafka stream application that uses session window
Topology looks like
.filter()
.groupByKey()
.windowedBy(SessionWindows.with(30).grace(5))
.count()
.toStream()
.selectKey((k, v)->k.key())
.to(outTopic)
When I run this application, and send data like:
key1, {somejson}
key1, {somejson}
key1, {somejson}
In the output topic, I correctly see the record after 30 seconds as expected
key1, 3
When I write a unit test for the same (after reading the other question about advancedWallClockTime, my test code looks like:
final Instant now = Instant.now();
// Send messages with one second difference timestamps
testDriver.pipeInput(consumerRecordFactory.create(inputTopicName, "key1", json, now.toEpochMilli()));
testDriver.pipeInput(consumerRecordFactory.create(inputTopicName, "key1", json, now.plusMillis(1000L).toEpochMilli()));
testDriver.pipeInput(consumerRecordFactory.create(inputTopicName, "key1", json, now.plusMillis(2000L).toEpochMilli()));
testDriver.advanceWallClockTime(35000L)
Then I try to compare the results
ProducerRecord<String, Long> life = testDriver.readOutput(outputTopicName, stringSerde.deserializer(), longSerde.deserializer());
Assert.assertEquals(lifevalue, Long.valueOf(3));
I expect it to be 3 but it seems its always 1. But if I write something like:
List<ProducerRecord<String, Long>> expectedList = Arrays.asList(
new ProducerRecord<String, Long>(outputTopicName, "key1", 1L),
new ProducerRecord<String, Long>(outputTopicName, "key1", 2L),
new ProducerRecord<String, Long>(outputTopicName, "key1", 3L)
);
for (ProducerRecord<String, Long> expected : expectedList) {
ProducerRecord<String, Long> actual = testDriver.readOutput(outputTopicName, stringSerde.deserializer(), longSerde.deserializer());
Assert.assertEquals(expected.value(), actual.value());
}
then my test passes.
What I am doing wrong? Eventually, I would like to add data for two different keys and test that both of them are coming with count: 3L.