I am writing a streaming application with Kafka Streams, Spring-Kafka and Spring Boot. I cannot find any information how to properly test stream processing done by Kafka Streams DSL while using Spring-Kafka. Documentation mentions EmbeddedKafkaBroker but there seems to be no information on how to handle testing for example state stores.
Just to provide some simple example of what I would like to test. I have a following bean registered (where Item is avro generated):
@Bean
public KTable<String, Long> itemTotalKTable(StreamsBuilder streamsBuilder) {
return streamsBuilder
.stream(ITEM_TOPIC,
Consumed.with(Serdes.String(), itemAvroSerde))
.mapValues((id, item) -> item.getNumber())
.groupByKey()
.aggregate(
() -> 0L,
(id, number, agg) -> agg + number,
Materialized.with(Serdes.String(), Serdes.Long()));
}
What is a proper way to test that all item numbers are aggregated?