6

I'm playing with the TopologyTestDriver of Kafka Streams in order to get our data pipelines tested.

It has worked like a charm with all our simple topologies, including the stateful ones that use Stores. My problem is when I try to use this test driver in order to test topologies that use window aggregation.

I've copied a simple example that sums integers received with the same key within a 10 seconds window.

public class TopologyWindowTests {

TopologyTestDriver testDriver;
String INPUT_TOPIC = "INPUT.TOPIC";
String OUTPUT_TOPIC = "OUTPUT.TOPIC";

@Before
public void setup(){
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
    // EventProcessor is a <String,String> processor
    // so we set those serders
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
    testDriver = new TopologyTestDriver(defineTopology(),config,0L);
}

/**
 * topology test
 */
@Test
public void testTopologyNoCorrelation() throws IOException {
    ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(INPUT_TOPIC, new StringSerializer(), new IntegerSerializer());
    testDriver.pipeInput(factory.create(INPUT_TOPIC,"k",2,1L));

    ProducerRecord<String, Integer> outputRecord = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), new IntegerDeserializer());

    Assert.assertNull(outputRecord);
}

@After
public void tearDown() {
    testDriver.close();
}

/**
 * Defines topology
 * @return
 */
public Topology defineTopology(){
    StreamsBuilder builder = new StreamsBuilder();
    KStream<String,Integer> inputStream = builder.stream(INPUT_TOPIC);

    KTable<Windowed<String>, Integer> groupedMetrics = inputStream.groupBy((key,value)->key,
            Serialized.with(Serdes.String(),Serdes.Integer())).windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(10))).aggregate(
            ()-> 0,
            (String aggKey, Integer newValue, Integer aggValue)->{
                Integer val = aggValue+newValue;
                return val;
            },
            Materialized.<String,Integer,WindowStore<Bytes,byte[]>>as("GROUPING.WINDOW").withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer())
    );

    groupedMetrics.toStream().map((key,value)->KeyValue.pair(key.key(),value)).to(OUTPUT_TOPIC);

    return builder.build();

}

}

I would expect that in this test case nothing is returned to the output topic unless I advance the wall clock time 10 seconds... But I'm getting the following output

java.lang.AssertionError: expected null, but was:<ProducerRecord(topic=OUTPUT.TOPIC, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=k, value=2, timestamp=0)>

Am I missing something here? I'm using kafka 2.0.0

UPDATE

Thanks in advance

According to Matthias response, I've prepared the following test:

@Test
public void testTopologyNoCorrelation() throws IOException {
    ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(INPUT_TOPIC, new StringSerializer(), new IntegerSerializer());
    testDriver.pipeInput(factory.create(INPUT_TOPIC,"k",2,1L));
    testDriver.pipeInput(factory.create(INPUT_TOPIC,"k",2,1L));

    // Testing 2+2=4
    ProducerRecord<String, Integer> outputRecord1 = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), new IntegerDeserializer());
    Assert.assertEquals(Integer.valueOf(4),outputRecord1.value());

    // Testing no more events in the window
    ProducerRecord<String, Integer> outputRecord2 = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), new IntegerDeserializer());
    Assert.assertNull(outputRecord2);
}

Both input messages has been sent with the same timestamp, so I'm expecting only one event in the output topic with the sum of my values. However, I'm receiving 2 events in the output (the first one with a value of 2, and the second one with a value of 4), which I think is not the desired behaviour of the topology.

David O
  • 61
  • 3

1 Answers1

4

By default, Kafka Streams operates on event-time for window operations, not wall-clock-time. This guarantees deterministic processing semantics (wall-clock time processing is inherently non-deterministic). Check out the docs for more more details: https://docs.confluent.io/current/streams/concepts.html#time

Thus, the timestamps of your input records determine in which window a record is put. Also, the timestamps of your input records advance the internally tracked "stream time" that is based on those event timestamps.

Also note, that Kafka Streams follow a continuous processing model and does emit updated instead of waiting for a window-end condition. This is important to handle late-arriving (aka out-of-order data). Compare How to send final kafka-streams aggregation result of a time windowed KTable? and https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/.

Update

It's because of the "update" processing model. When you aggregate, each input record updates the "current" result, and a "current result output record" to produced. This happens for every record (not every timestamp).

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • 1
    Thanks Matthias for your quick response. I've added a new test showing the behaviour of the test driver when I send 2 events to the input topic with the same timestamp. I would expect to have only the sum of both values in the output, but I'm getting 2 events... ¿Could you explain me that? – David O Oct 05 '18 at 16:55
  • @DavidO Extended my answer – Matthias J. Sax Oct 05 '18 at 17:17
  • 1
    I understand your point, but that's not the behaviour I see when running the topology against a Kafka cluster. In that case, I can see only one output event in every window, no matters the number of records I've sent. So, my point is that there's someting is not running as it should, as the behaviour against the test driver and against a Kafka cluster are different. You can find the whole project here: https://github.com/davidonoro/ks-streaming-example – David O Oct 14 '18 at 09:01
  • 2
    The difference is that the `TopologyTestDriver` commits after each record it processes (this includes flushing the KTable state store caches). When you run against the Cluster, Kafka Streams, by default only commits every 30 seconds, thus consecutive updates are "de-duplicated". You can get the same behavior for both, if you disable caching. Compare https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html – Matthias J. Sax Oct 16 '18 at 04:45
  • so based on the last comment its not really possible to accurately test a windowed stream topology using the topology test driver? – bad robot Jul 29 '19 at 15:12
  • Why not? You can still test it. – Matthias J. Sax Jul 29 '19 at 15:34