2

My application use kafka streams suppress logic.

I want to test kafka streams topology using suppress.

Runnning uinit test, My topology not emit result.

Kafka streams logic

...
.suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(5), Suppressed.BufferConfig.maxBytes(1_000_000_000L).emitEarlyWhenFull()))
...

My test case code. After create input data, running test case cant't read suppress logic output record. just return null

testDriver.pipeInput(recordFactory.create("input", key, dummy, 0L));

System.out.println(testDriver.readOutput("streams-result", Serdes.String().deserializer(), serde.deserializer()));

Can i test my suppress logic?

Jeahyun Kim
  • 283
  • 1
  • 3
  • 8

1 Answers1

3

The simple answer is yes.

Some good references are Confluent Example Tests this example in particular tests the suppression feature. And many other examples always a good place to check first. Here is another example of mine written in Kotlin.

An explanation of the feature and testing it can be found in post 3 on this blog post

Some key points:

  • The window will only emit the final result as expected from the documents.
  • To flush the final results you will need to send an extra dummy event as seen in the examples such as confluents here.
  • You will need to manipulate the event time to test it as suppression works off the event time this can be provided by the test input topic API or use a custom TimestampExtractor.
  • For testing I recommend setting the following to remove cache and reduce commit interval.

    props[StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG] = 0 props[StreamsConfig.COMMIT_INTERVAL_MS_CONFIG] = 5

Hope this helps.

perkss
  • 1,037
  • 1
  • 11
  • 37