I have a app that uses KStream to read from Kafka, filter the data based on the header, and write to KTable.
public Topology buildTopology() {
KStream<String,String> inputStream = builder.stream("topicname");
KStream<String,String> filteredStream = inputStream.transformValues(KSExtension::new)
.filter((key,value) -> value!=null);
kTable = filteredStream.groupByKey()
.reduce(((value1, value2) -> value2), Materialized.as("ktable"));
KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
return builder.build();
}
I'm trying to create a unit test for this using TopologyTestDriver
private TopologyTestDriver td;
private TestInputTopic<String, String> inputTopic;
private TestOutputTopic<String, String> outputTopic;
private Topology topology;
private Properties streamConfig;
@BeforeEach
void setUp() {
streamConfig = new Properties();
streamConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "AppId");
streamConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "foo:1234");
streamConfig.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamConfig.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
topology = new Topology();
td = new TopologyTestDriver(topology, streamConfig);
inputTopic = td.createInputTopic("input-topic", Serdes.String().serializer(), Serdes.String().serializer());
outputTopic = td.createOutputTopic("output-topic", Serdes.String().deserializer(), Serdes.String().deserializer());
}
@Test
void buildTopology(){
inputTopic.pipeInput("key1", "value1");
topology = app.buildTopology();
}
When I run the test, i get the exception "java.lang.IllegalArgumentException: Unknown topic: input-topic"
DEBUG org.apache.kafka.streams.processor.internals.InternalTopologyBuilder - No source topics using pattern subscription found, initializing consumer's subscription collection.
java.lang.IllegalArgumentException: Unknown topic: input-topic
at org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:582)
at org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:945)
at org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:115)
at org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:137)
at testclassname.buildTopology()
Can some one help me understand what I am missing here?