2

My question is similar to: How to unit test a kafka stream application that uses session window

Topology looks like

.filter()
.groupByKey()
.windowedBy(SessionWindows.with(30).grace(5))
.count()
.toStream()
.selectKey((k, v)->k.key())
.to(outTopic)

When I run this application, and send data like:

key1, {somejson}
key1, {somejson}
key1, {somejson}

In the output topic, I correctly see the record after 30 seconds as expected

key1, 3

When I write a unit test for the same (after reading the other question about advancedWallClockTime, my test code looks like:

final Instant now = Instant.now();

// Send messages with one second difference timestamps
testDriver.pipeInput(consumerRecordFactory.create(inputTopicName, "key1", json, now.toEpochMilli()));
testDriver.pipeInput(consumerRecordFactory.create(inputTopicName, "key1", json, now.plusMillis(1000L).toEpochMilli()));
testDriver.pipeInput(consumerRecordFactory.create(inputTopicName, "key1", json, now.plusMillis(2000L).toEpochMilli()));

testDriver.advanceWallClockTime(35000L)

Then I try to compare the results

ProducerRecord<String, Long> life = testDriver.readOutput(outputTopicName, stringSerde.deserializer(), longSerde.deserializer());
Assert.assertEquals(lifevalue, Long.valueOf(3));

I expect it to be 3 but it seems its always 1. But if I write something like:

List<ProducerRecord<String, Long>> expectedList = Arrays.asList(
  new ProducerRecord<String, Long>(outputTopicName, "key1", 1L),
  new ProducerRecord<String, Long>(outputTopicName, "key1", 2L),
  new ProducerRecord<String, Long>(outputTopicName, "key1", 3L)
);

for (ProducerRecord<String, Long> expected : expectedList) {
    ProducerRecord<String, Long> actual = testDriver.readOutput(outputTopicName, stringSerde.deserializer(), longSerde.deserializer());
    Assert.assertEquals(expected.value(),  actual.value());
}

then my test passes.

What I am doing wrong? Eventually, I would like to add data for two different keys and test that both of them are coming with count: 3L.

1 Answers1

1

The difference you see with regards to testing is how the TopologyTestDriver works. It might help first to explain how Kafka Streams treats stateful operations for some context.

When you run the Kafka Streams application, "for real" records from stateful operations are buffered by the internal cache. Kafka Streams flushes the internal cache when either of the two following conditions is met:

  1. Committing records (default commit interval is 30 seconds)
  2. The cache is full.

From what you describe above, you observe the count of 3 after streams commits the consumed offsets. The first two records were replaced in the cache, and only the last count of 3 is emitted.

Now with the TopologyTestDriver, there is no internal caching; the test driver forwards each record. As a result, you'll have to call testDriver.readOutput for each record you've submitted.

So your line above

ProducerRecord<String, Long> life = testDriver.readOutput(outputTopicName, stringSerde.deserializer(), longSerde.deserializer());

emits the first record you supplied via testDriver.pipeInput. As you only called testDriver.readOutput once.

You'll notice in your second code example:

for (ProducerRecord<String, Long> expected : expectedList) {
    ProducerRecord<String, Long> actual = testDriver.readOutput(outputTopicName, stringSerde.deserializer(), longSerde.deserializer());
    Assert.assertEquals(expected.value(),  actual.value());
}

You get the expected result because you execute testDriver.readOutput the same number of times as you've input test records.

HTH,

Bill

bbejeck
  • 1,310
  • 8
  • 7
  • Thanks. That explains everything. I tried some more examples based on your comment and they work as expected. This should be accepted answer. – psykidellic Jun 01 '20 at 18:34