1

I have a app that uses KStream to read from Kafka, filter the data based on the header, and write to KTable.

public Topology buildTopology() {
        KStream<String,String> inputStream = builder.stream("topicname");
        KStream<String,String> filteredStream = inputStream.transformValues(KSExtension::new)
                .filter((key,value) -> value!=null);
        
        kTable = filteredStream.groupByKey()
                .reduce(((value1, value2) -> value2), Materialized.as("ktable"));
        
        KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        return builder.build();
    }

I'm trying to create a unit test for this using TopologyTestDriver

private TopologyTestDriver td;
    private TestInputTopic<String, String> inputTopic;
    private TestOutputTopic<String, String> outputTopic;
    private Topology topology;
    private Properties streamConfig;

@BeforeEach
    void setUp() {
        streamConfig = new Properties();
        streamConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "AppId");
        streamConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "foo:1234");
        streamConfig.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamConfig.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        topology = new Topology();
        td = new TopologyTestDriver(topology, streamConfig);
        inputTopic = td.createInputTopic("input-topic", Serdes.String().serializer(), Serdes.String().serializer());
        outputTopic = td.createOutputTopic("output-topic", Serdes.String().deserializer(), Serdes.String().deserializer());
    }
 @Test
    void buildTopology(){
        inputTopic.pipeInput("key1", "value1");
        topology = app.buildTopology();
    }

When I run the test, i get the exception "java.lang.IllegalArgumentException: Unknown topic: input-topic"

DEBUG org.apache.kafka.streams.processor.internals.InternalTopologyBuilder - No source topics using pattern subscription found, initializing consumer's subscription collection.

java.lang.IllegalArgumentException: Unknown topic: input-topic
    at org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:582)
    at org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:945)
    at org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:115)
    at org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:137)
    at testclassname.buildTopology()

Can some one help me understand what I am missing here?

perplexedDev
  • 857
  • 4
  • 17
  • 49

1 Answers1

2

I see you are creating an empty Topology, used to initialize the TopologyTestDriver:

topology = new Topology();
td = new TopologyTestDriver(topology, streamConfig);

When this empty topology is used to instantiate the TopologyTestDriver with td = new TopologyTestDriver(topology, streamConfig);, the test driver does not know about any topics, since no topology was effectively built.

I suppose this is why, when you try to pipe input into "input-topic" using inputTopic.pipeInput("key1", "value1");, the test driver throws an IllegalArgumentException complaining about an "Unknown topic: input-topic".


You should call your buildTopology() method to generate the actual topology you are testing, and use that when creating your TopologyTestDriver.

Make sure that the topic names in your test (input-topic, output-topic) match those in your actual application ("topicname").

@BeforeEach
void setUp() {
    streamConfig = new Properties();
    streamConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "AppId");
    streamConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "foo:1234");
    streamConfig.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamConfig.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

    // Create the topology using your actual code
    topology = app.buildTopology();

    // Now create a TopologyTestDriver using the real topology
    td = new TopologyTestDriver(topology, streamConfig);

    // The topic name here should match the actual topic you use in the real topology
    inputTopic = td.createInputTopic("topicname", Serdes.String().serializer(), Serdes.String().serializer());

    // Create output topic if you need it
    // outputTopic = td.createOutputTopic("output-topic", Serdes.String().deserializer(), Serdes.String().deserializer());
}

@Test
void buildTopology(){
    inputTopic.pipeInput("key1", "value1");
    // Your assertions here
}

Note: I removed the output topic from the setup because in your code snippet you did not specify the output topic where your KTable gets written to. You may add it back if your actual application writes to an output topic.


I have updated the code where I add the name of the ktable store.
How do I test the value was added to the ktable?

You can query the state store that backs the KTable to check its contents.
In Kafka Streams, each KTable is backed by a state store (even a versioned one very recently, Aug. 2023), and you can directly interact with this store in tests.

Make sure that you set a store name for your KTable in your topology:

kTable = filteredStream.groupByKey()
        .reduce(((value1, value2) -> value2), Materialized.as("myKTableStore"));

Here, "myKTableStore" is the name of the state store that backs the KTable.

In your test, you can retrieve the store from the TopologyTestDriver and check the value for a specific key:

@Test
void buildTopology() {
    inputTopic.pipeInput("key1", "value1");

    // Retrieve the state store
    ReadOnlyKeyValueStore<String, String> keyValueStore = 
        td.getKeyValueStore("myKTableStore");

    // Assert that the KTable contains the expected value for the key
    assertEquals("value1", keyValueStore.get("key1"));
}

That way, you can validate that your KTable contains the expected key-value pairs.

Note that ReadOnlyKeyValueStore is a part of the Kafka Streams API. Import it as needed.
You can see it used in "Kafka Streams Interactive Queries / Querying local key-value stores"

https://docs.confluent.io/platform/current/_images/streams-interactive-queries-api-01.png


How can I input headers to the input topic in the test? I couldn't find an option. I filter on the header values here inputStream.transformValues(KSExtension::new)

In Kafka Streams' TopologyTestDriver, the ability to directly add headers to the TestInputTopic is somewhat limited.
However, you can use the lower-level pipeInput() method that allows you to pass a ConsumerRecord object, which can have headers.

You will need to build the ConsumerRecord manually and then use it:

@Test
void buildTopology() {
    // Create a Headers object and add your custom headers
    Headers headers = new RecordHeaders();
    headers.add(new RecordHeader("myHeaderKey", "myHeaderValue".getBytes()));

    // Create a ConsumerRecord with headers
    ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
            "topicname", // topic
            0, // partition
            0, // offset
            "key1".getBytes(), // key
            "value1".getBytes(), // value
            headers // headers
    );

    // Pipe the record into TopologyTestDriver
    td.pipeInput(record);

    // The rest of your test
}

Make sure to replace "topicname" with the name of the topic you are actually reading from in your topology, and adjust the key, value, and headers as needed for your test.

That should allow you to include headers in your test records, which should then be processed by your transformValues operation as expected.

VonC
  • 1,262,500
  • 529
  • 4,410
  • 5,250
  • yes that was it. Thank you so much. – perplexedDev Aug 28 '23 at 19:24
  • I had a follow up question, I have updated the code where I add the name of the ktable store. How do I test the value was added to the ktable? – perplexedDev Aug 28 '23 at 19:29
  • @perplexedDev I have edited the answer to address your comment. – VonC Aug 28 '23 at 19:45
  • great, thank you for the detailed explanation. I am new to Kafka Stream and TopologyTestDriver so this is really helpful. One last query, how can I input headers to the input topic in the test? I couldn't find an option. I filter on the header values here inputStream.transformValues(KSExtension::new) – perplexedDev Aug 28 '23 at 23:26
  • I have another question - https://stackoverflow.com/questions/76949239/kafka-streams-use-manually-created-internal-topics-giving-topicauthorizationexce You seem to be very experienced in Kafka Streams, do you have any thoughts on this question? – perplexedDev Aug 28 '23 at 23:27
  • @perplexedDev I have edited the answer to address your comment – VonC Aug 29 '23 at 06:06