I know that there is similar topic here: Flink Unit Test over ProcessWindowFunction<IN, OUT, KEY, W>, but it does not solve my problem.
Here's my problem:
I'm trying to write a unit test, which is testing Apache Flink pipeline with data stream with window
operator.
I have the following code:
inputStream
.keyBy(Event::getSessionId)
.window(SlidingProcessingTimeWindows.of(
Time.milliseconds(2000),
Time.milliseconds(200)
))
.process(new ProcessWindowFunction<Event, Feature, Long, TimeWindow>() {
@Override
public void process(Long aLong, Context context, Iterable<Event> elements, Collector<Feature> out) {
// process window of events here
out.collect(Feature.newBuilder().build());
}
});
and I'm testing it like this:
@Test
public void shouldRunPipeline() throws Exception {
long sessionId = 1;
long userId = 1;
List<Event> events = new ArrayList<>();
events.add(createAccEvent(sessionId, userId, createTsNow(), 1f, 2f, 3f));
events.add(createAccEvent(sessionId, userId, createTsNow(), 4f, 5f, 6f));
events.add(createGyroEvent(sessionId, userId, createTsNow(), 7f, 8f, 9f));
events.add(createGyroEvent(sessionId, userId, createTsNow(), 10f, 11f, 12f));
events.add(createTouchDownEvent(sessionId, userId, createTsNow()));
events.add(createAccEvent(sessionId, userId, createTsNow(), 1f, 2f, 3f));
events.add(createAccEvent(sessionId, userId, createTsNow(), 4f, 5f, 6f));
events.add(createGyroEvent(sessionId, userId, createTsNow(), 7f, 8f, 9f));
events.add(createGyroEvent(sessionId, userId, createTsNow(), 10f, 11f, 12f));
events.add(createTouchUpEvent(sessionId, userId, createTsNow()));
events.add(createAccEvent(sessionId, userId, createTsNow(), 1f, 2f, 3f));
events.add(createAccEvent(sessionId, userId, createTsNow(), 4f, 5f, 6f));
events.add(createGyroEvent(sessionId, userId, createTsNow(), 7f, 8f, 9f));
events.add(createGyroEvent(sessionId, userId, createTsNow(), 10f, 11f, 12f));
CollectSink.values.clear();
DataStream<Event> source = getTestEnv()
.fromCollection(events)
.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
getPipeline()
.process(source)
.addSink(new CollectSink());
getTestEnv().execute();
List<Feature> features = CollectSink.values;
assertThat(features).isNotEmpty();
}
private long createTsNow() {
return Instant.now().toEpochMilli();
}
private static class CollectSink implements SinkFunction<Feature> {
public static final List<Feature> values = Collections.synchronizedList(new ArrayList<>());
@Override
public void invoke(Feature value, Context context) {
values.add(value);
}
}
Method getPipeline()
is creating my pipeline, getTestEnv()
is getting test environment from the base testing class, and methods with names createXEvent()
are my helper methods for creating events for the input stream for test.
I used Sink in the test because I read somewhere that DataStreamUtils.collect()
may not work with window
operator because data is accumulated in the different way in such case.
I run this test in the debugger and ProcessWindowFunction
is never reached!
Once I remove window
operator, ProcessFunction
(this time KeyedProcessFunction
) is reached. This example is shown on the snippet below.
inputStream
.keyBy(Event::getSessionId)
.process(new KeyedProcessFunction<Long, Event, Feature>() {
@Override
public void processElement(Event value, Context ctx, Collector<Feature> out) {
// process events here
out.collect(Feature.newBuilder().build());
}
});
My question is: How to write correct unit test for the stream, which uses window
operator, so the instance of ProcessWindowFunction
in the process
method will be reached?
I could not find any valid solution to this problem in the Apache Flink documentation and on the web.
I'm able to unit test logic of the process function separately, but I'd like to unit test whole pipeline too.
I will appreciate any help.
Regards,
Piotr