I found a solution, inspired to this method https://github.com/apache/flink/blob/release-1.11/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java#L1340
In my case, I have to test a TumblingProcessingTimeWindow
where the process()
operator use a ProcessWindowFunction
to count words keeping the previous window count (i.e not resetting the count each time the window is triggered)
WordCountPojo
is a simple POJO with two field: word
and count
(you can use Tuple2
if you please)
This is the test I wrote:
@Test
void testCounter() throws Exception {
//create a WindowOperator<Key, Input, Accumulator, Output, Window>
WindowOperator<String, WordCountPojo, Iterable<WordCountPojo>, WordCountPojo, TimeWindow> operator =
new WindowOperator<>(
TumblingProcessingTimeWindows.of(Time.seconds(3)), //window assigner
new TimeWindow.Serializer(), //window serializer
WordCountPojo::getWord, //key selector
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), //key serializer
new ListStateDescriptor<>( //window state descriptor (in order to accumulate events inside the window)
"window-content",
TypeInformation.of(WordCountPojo.class).createSerializer(new ExecutionConfig())), //input serializer
new InternalIterableProcessWindowFunction<>(new Counter()), //my custom ProcessWindowFunction to invoke
ProcessingTimeTrigger.create(), //window trigger
0,
null);
//Flink Test Harness
OneInputStreamOperatorTestHarness<WordCountPojo, WordCountPojo> harness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, WordCountPojo::getWord, BasicTypeInfo.STRING_TYPE_INFO);
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
harness.open();
harness.setProcessingTime(10);
//Push data into window
harness.processElement(new StreamRecord<>(new WordCountPojo("to", 1)));
harness.processElement(new StreamRecord<>(new WordCountPojo("be", 1)));
harness.processElement(new StreamRecord<>(new WordCountPojo("or", 1)));
harness.processElement(new StreamRecord<>(new WordCountPojo("not", 1)));
harness.setProcessingTime(3500); //Set processing time in order to trigger the window
//Expected result
expected.add(new StreamRecord<>(new WordCountPojo("to", 1), 2999));
expected.add(new StreamRecord<>(new WordCountPojo("be", 1), 2999));
expected.add(new StreamRecord<>(new WordCountPojo("or", 1), 2999));
expected.add(new StreamRecord<>(new WordCountPojo("not", 1), 2999));
TestHarnessUtil.assertOutputEqualsSorted("Output not equal to expected", expected, harness.getOutput(),
Comparator.comparing(streamRecord -> ((StreamRecord<WordCountPojo>) streamRecord).getValue().getWord())
.thenComparing(streamRecord -> ((StreamRecord<WordCountPojo>) streamRecord).getTimestamp()));
//push other WordCountPojos to test global counting
harness.processElement(new StreamRecord<>(new WordCountPojo("to", 1)));
harness.processElement(new StreamRecord<>(new WordCountPojo("be", 1)));
harness.setProcessingTime(7000); //trigger the window again
//Expected result
expected.add(new StreamRecord<>(new WordCountPojo("to", 2), 5999));
expected.add(new StreamRecord<>(new WordCountPojo("be", 2), 5999));
TestHarnessUtil.assertOutputEqualsSorted("Output not equal to expected", expected, harness.getOutput(),
Comparator.comparing(streamRecord -> ((StreamRecord<WordCountPojo>) streamRecord).getValue().getWord())
.thenComparing(streamRecord -> ((StreamRecord<WordCountPojo>) streamRecord).getTimestamp()));
harness.close();
}
Attention points:
- Type of the accumulator for the
WindowOperator
is
Iterable<WordCountPojo>
and NOT simply WordCountPojo
. This
because my Counter
's process()
method receive an Iterable
and
not a single WordCountPojo
(remember that Counter
extends
WindowProcessFunction
)
WindowOperator
's state descriptor parameter is a ListStateDescriptor
,
this means that when window collects WordCountPojo
s (WindowOperatorTest
example use a ReducingStateDescriptor
that reduce by sum, but I don't need to do these because I've the Counter
function that is the function that I want to test)
WindowsOperator
's internal window function parameter is of type InternaleIterableProcessWindowFunction
. This function wraps my Counter
function and is invoked when the window is triggered. Because the window accumulate an Iterable<WordCountPojo>
collected by using aListStateDescriptor
, when the Counter
function is invoked this Iterable is passed as input parameter of the process()
method.