4

How can I create a unit test for a Stateful Process Function. I have something like this:

 private static SingleOutputStreamOperator<Tuple> methodName(KeyedStream<Event, String> stream) {
        return stream.window(TumblingEventTimeWindows.of(Time.minutes(10)))
                .process(new ProcessFunction());
    }

and

ProcessFunction extends ProcessWindowFunction<IN, OUT, KEY, W>

All the Harness tests that I've found in Flink page are extending from KeyedProcessFunction and this is not my case. Thanks. Kind regards!

David Anderson
  • 39,434
  • 4
  • 33
  • 60
Alter
  • 903
  • 1
  • 11
  • 27

2 Answers2

4

In general these test harnesses expect to be testing an operator, rather than a user function. So in the case of a ProcessWindowFunction, you need to first create an suitable window operator to pass to the test harness.

You can test a ProcessWindowFunction using a OneInputStreamOperatorTestHarness that you instantiate with a WindowOperator wrapped around your ProcessWindowFunction. I'm afraid this isn't particularly straightforward, but I can refer you to https://github.com/apache/flink/blob/release-1.11/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java#L437 as an example.

David Anderson
  • 39,434
  • 4
  • 33
  • 60
  • 1
    Could you please give the simple, basic example of testing Flink window function? Especially `SlidingProcessingTimeWindows` function. I looked into these code snippets, but they seem to be quite complicated for me. I'm not able to apply them to my project. I also tried to unit test window function with custom sink, but I get no results on the output. There's nothing written about testing window functions in the Flink documentation too. I cannot find the very basic example of testing Flink window functions anywhere. – Piotr Wittchen Jun 17 '21 at 12:09
2

I found a solution, inspired to this method https://github.com/apache/flink/blob/release-1.11/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java#L1340

In my case, I have to test a TumblingProcessingTimeWindow where the process() operator use a ProcessWindowFunction to count words keeping the previous window count (i.e not resetting the count each time the window is triggered)

WordCountPojo is a simple POJO with two field: word and count (you can use Tuple2 if you please)

This is the test I wrote:

@Test
void testCounter() throws Exception {
    //create a WindowOperator<Key, Input, Accumulator, Output, Window>
    WindowOperator<String, WordCountPojo, Iterable<WordCountPojo>, WordCountPojo, TimeWindow> operator =
            new WindowOperator<>(
                    TumblingProcessingTimeWindows.of(Time.seconds(3)), //window assigner
                    new TimeWindow.Serializer(), //window serializer
                    WordCountPojo::getWord, //key selector
                    BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), //key serializer
                    new ListStateDescriptor<>( //window state descriptor (in order to accumulate events inside the window)
                            "window-content",
                            TypeInformation.of(WordCountPojo.class).createSerializer(new ExecutionConfig())), //input serializer
                    new InternalIterableProcessWindowFunction<>(new Counter()), //my custom ProcessWindowFunction to invoke
                    ProcessingTimeTrigger.create(), //window trigger
                    0,
                    null);

    //Flink Test Harness
    OneInputStreamOperatorTestHarness<WordCountPojo, WordCountPojo> harness =
            new KeyedOneInputStreamOperatorTestHarness<>(operator, WordCountPojo::getWord, BasicTypeInfo.STRING_TYPE_INFO);

    ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
    harness.open();
    harness.setProcessingTime(10);

    //Push data into window
    harness.processElement(new StreamRecord<>(new WordCountPojo("to", 1)));
    harness.processElement(new StreamRecord<>(new WordCountPojo("be", 1)));
    harness.processElement(new StreamRecord<>(new WordCountPojo("or", 1)));
    harness.processElement(new StreamRecord<>(new WordCountPojo("not", 1)));

    harness.setProcessingTime(3500); //Set processing time in order to trigger the window

    //Expected result
    expected.add(new StreamRecord<>(new WordCountPojo("to", 1), 2999));
    expected.add(new StreamRecord<>(new WordCountPojo("be", 1), 2999));
    expected.add(new StreamRecord<>(new WordCountPojo("or", 1), 2999));
    expected.add(new StreamRecord<>(new WordCountPojo("not", 1), 2999));

    TestHarnessUtil.assertOutputEqualsSorted("Output not equal to expected", expected, harness.getOutput(),
            Comparator.comparing(streamRecord -> ((StreamRecord<WordCountPojo>) streamRecord).getValue().getWord())
                    .thenComparing(streamRecord -> ((StreamRecord<WordCountPojo>) streamRecord).getTimestamp()));

    //push other WordCountPojos to test global counting
    harness.processElement(new StreamRecord<>(new WordCountPojo("to", 1)));
    harness.processElement(new StreamRecord<>(new WordCountPojo("be", 1)));

    harness.setProcessingTime(7000); //trigger the window again

    //Expected result
    expected.add(new StreamRecord<>(new WordCountPojo("to", 2), 5999));
    expected.add(new StreamRecord<>(new WordCountPojo("be", 2), 5999));

    TestHarnessUtil.assertOutputEqualsSorted("Output not equal to expected", expected, harness.getOutput(),
            Comparator.comparing(streamRecord -> ((StreamRecord<WordCountPojo>) streamRecord).getValue().getWord())
                    .thenComparing(streamRecord -> ((StreamRecord<WordCountPojo>) streamRecord).getTimestamp()));

    harness.close();
}

Attention points:

  • Type of the accumulator for the WindowOperator is Iterable<WordCountPojo> and NOT simply WordCountPojo. This because my Counter's process() method receive an Iterable and not a single WordCountPojo (remember that Counter extends WindowProcessFunction)
  • WindowOperator's state descriptor parameter is a ListStateDescriptor, this means that when window collects WordCountPojos (WindowOperatorTest example use a ReducingStateDescriptor that reduce by sum, but I don't need to do these because I've the Counter function that is the function that I want to test)
  • WindowsOperator's internal window function parameter is of type InternaleIterableProcessWindowFunction. This function wraps my Counter function and is invoked when the window is triggered. Because the window accumulate an Iterable<WordCountPojo> collected by using aListStateDescriptor, when the Counter function is invoked this Iterable is passed as input parameter of the process() method.
Vin
  • 701
  • 1
  • 9
  • 30
  • Is there any way to do this in Scala? I found no way to wrap a `org.apache.flink.streaming.api.scala.function.ProcessWindowFunction` with something like `InternalIterableProcessWindowFunction`, needed to construct a `WindowOperator`, as this requires a `org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction`. – Michele Guerriero Jan 23 '23 at 14:10