8

I am working with Kafka Stream 2.1

I am trying to write some test for a stream application that aggregates some events by their key (i.e by a correlation ID) using a session window with an inactivity gap of 300ms.

Here is the aggregation implementation represented by a method :

    private static final int INACTIVITY_GAP = 300;

    public KStream<String, AggregatedCustomObject> aggregate(KStream<String, CustomObject> source) {

        return source
                // group by key (i.e by correlation ID)
                .groupByKey(Grouped.with(Serdes.String(), new CustomSerde()))
                // Define a session window with an inactivity gap of 300 ms
                .windowedBy(SessionWindows.with(Duration.ofMillis(INACTIVITY_GAP)).grace(Duration.ofMillis(INACTIVITY_GAP)))
                .aggregate(
                        // initializer
                        () -> new AggregatedCustomObject(),
                        // aggregates records in same session
                        (s, customObject, aggCustomObject) -> {
                            // ...
                            return aggCustomObject;
                        },
                        // merge sessions
                        (s, aggCustomObject1, aggCustomObject2) -> {
                            // ...
                            return aggCustomObject2;
                        },
                        Materialized.with(Serdes.String(), new AggCustomObjectSerde())
                )
                .suppress(Suppressed.untilWindowCloses(unbounded()))
                .toStream()
                .selectKey((stringWindowed, aggCustomObject) -> "someKey");
    ;
    }

This stream process works as expected. But for unit tests, that's a different story.

My test stream configuration looks like this:

        // ...

        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test");
        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, myCustomObjectSerde.getClass());
        // disable cache
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        // commit ASAP
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);


        StreamsBuilder builder = new StreamsBuilder();
        aggregate(builder.stream(INPUT_TOPIC), OUTPUT_TOPIC, new AggCustomObjectSerde())
.to(OUTPUT_TOPIC);

        Topology topology = builder.build();
        TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
        ConsumerRecordFactory<String, MyCustomObject> factory = new ConsumerRecordFactory<>(INPUT_TOPIC, new StringSerializer(), myCustomSerializer)

        // ...

And a test would look as follow:

List<ConsumerRecord<byte[], byte[]>> records = myCustomMessages.stream()
                .map(myCustomMessage -> factory.create(INPUT_TOPIC, myCustomMessage.correlationId, myCustomMessage))
                .collect(Collectors.toList());
testDriver.pipeInput(records);

ProducerRecord<String, AggregatedCustomMessage> record = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), myAggregatedCustomObjectSerde);

The problem is, record is always null. I tried a lot of things :

  • read in a loop with a timeout
  • change commit interval in config so result would be committed ASAP
  • Send an additional record with a different key just after (to trigger the window closing, as in KafkaStream event-time is based on record timestamps)
  • call the advanceWallClockTime method of the test driver

Well, nothing helps. Could someone tell me what I am missing, and how should I test a session window based stream application ?

Thanks a lot

gnos
  • 796
  • 1
  • 6
  • 17
  • 1
    `suppress()` will only emit if event-time passed window-end time plus grace-period. Sending and additional record to advance time seems correct -- what timestamp did you assign to the additional record? – Matthias J. Sax Aug 13 '19 at 18:49
  • 1
    @MatthiasJ.Sax Thank you! That was actually my problem. I was setting a timestamp too low for the additional record, so kafkaStream didn't notice the window were closed – gnos Aug 14 '19 at 11:39
  • Hi @gnostrenoff I have the same problem as you. I'm testing a 1 minute time window. `(.windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ZERO)))`, I pipe one record to the topology, sleep 2 minutes and send another record. But I still get `null` as result. Could you please explain more about how you fix your test? Thanks a lot! – thinktwice Aug 14 '19 at 22:01
  • @thinktwice It's about _event-time_ not _wall-clock time_ -- you need to set the corresponding record timestamp when you generate the data. – Matthias J. Sax Aug 15 '19 at 12:34
  • Thanks @MatthiasJ.Sax! Once I set the right event time, it works for me! Another question I noticed with `suppress()` is, I thought it should only populate the last record per window? But from the unit test, it sends all updates. Is it expected? – thinktwice Aug 15 '19 at 17:17
  • Not sure if I can follow. I thought that the problem was that you did not get any output without the additional record? And know you get more than one per window and key? Maybe start your own question and provide more details, what you input data, expected and observed output data is. – Matthias J. Sax Aug 15 '19 at 23:16

1 Answers1

1

SessionWindows work with event-time and not wall-clock . Try to set the event-time of your record properly to simulate the inactivity gap. Something like:

testDriver.pipeInput(factory.create(INPUT_TOPIC, key1, record1, eventTimeMs));
testDriver.pipeInput(factory.create(INPUT_TOPIC, key2, record2, eventTimeMs + inactivityGapMs));

But first, you need a custom TimestampExtractor like:

 public static class RecordTimestampExtractor implements TimestampExtractor {

    @Override
    public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
      return record.timestamp();
    }
  }

which has to be registered like:

 streamProperties.setProperty(
        StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
        RecordTimestampExtractor.class.getName()
    );
Peyman
  • 250
  • 3
  • 8