1

I know that there is similar topic here: Flink Unit Test over ProcessWindowFunction<IN, OUT, KEY, W>, but it does not solve my problem.

Here's my problem:

I'm trying to write a unit test, which is testing Apache Flink pipeline with data stream with window operator.

I have the following code:

inputStream
    .keyBy(Event::getSessionId)
    .window(SlidingProcessingTimeWindows.of(
            Time.milliseconds(2000),
            Time.milliseconds(200)
    ))
    .process(new ProcessWindowFunction<Event, Feature, Long, TimeWindow>() {
        @Override
        public void process(Long aLong, Context context, Iterable<Event> elements, Collector<Feature> out) {
            // process window of events here
            out.collect(Feature.newBuilder().build());
        }
    });

and I'm testing it like this:

    @Test
    public void shouldRunPipeline() throws Exception {
        long sessionId = 1;
        long userId = 1;

        List<Event> events = new ArrayList<>();

        events.add(createAccEvent(sessionId, userId, createTsNow(), 1f, 2f, 3f));
        events.add(createAccEvent(sessionId, userId, createTsNow(), 4f, 5f, 6f));
        events.add(createGyroEvent(sessionId, userId, createTsNow(), 7f, 8f, 9f));
        events.add(createGyroEvent(sessionId, userId, createTsNow(), 10f, 11f, 12f));

        events.add(createTouchDownEvent(sessionId, userId, createTsNow()));

        events.add(createAccEvent(sessionId, userId, createTsNow(), 1f, 2f, 3f));
        events.add(createAccEvent(sessionId, userId, createTsNow(), 4f, 5f, 6f));
        events.add(createGyroEvent(sessionId, userId, createTsNow(), 7f, 8f, 9f));
        events.add(createGyroEvent(sessionId, userId, createTsNow(), 10f, 11f, 12f));

        events.add(createTouchUpEvent(sessionId, userId, createTsNow()));

        events.add(createAccEvent(sessionId, userId, createTsNow(), 1f, 2f, 3f));
        events.add(createAccEvent(sessionId, userId, createTsNow(), 4f, 5f, 6f));
        events.add(createGyroEvent(sessionId, userId, createTsNow(), 7f, 8f, 9f));
        events.add(createGyroEvent(sessionId, userId, createTsNow(), 10f, 11f, 12f));

        CollectSink.values.clear();

        DataStream<Event> source = getTestEnv()
                .fromCollection(events)
                .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());

        getPipeline()
                .process(source)
                .addSink(new CollectSink());

        getTestEnv().execute();

        List<Feature> features = CollectSink.values;

        assertThat(features).isNotEmpty();
    }

    private long createTsNow() {
        return Instant.now().toEpochMilli();
    }

    private static class CollectSink implements SinkFunction<Feature> {

        public static final List<Feature> values = Collections.synchronizedList(new ArrayList<>());

        @Override
        public void invoke(Feature value, Context context) {
            values.add(value);
        }
    }

Method getPipeline() is creating my pipeline, getTestEnv() is getting test environment from the base testing class, and methods with names createXEvent() are my helper methods for creating events for the input stream for test.

I used Sink in the test because I read somewhere that DataStreamUtils.collect() may not work with window operator because data is accumulated in the different way in such case.

I run this test in the debugger and ProcessWindowFunction is never reached!

Once I remove window operator, ProcessFunction (this time KeyedProcessFunction) is reached. This example is shown on the snippet below.

inputStream
    .keyBy(Event::getSessionId)
    .process(new KeyedProcessFunction<Long, Event, Feature>() {
        @Override
        public void processElement(Event value, Context ctx, Collector<Feature> out) {
            // process events here
            out.collect(Feature.newBuilder().build());

        }
    });

My question is: How to write correct unit test for the stream, which uses window operator, so the instance of ProcessWindowFunction in the process method will be reached?

I could not find any valid solution to this problem in the Apache Flink documentation and on the web.

I'm able to unit test logic of the process function separately, but I'd like to unit test whole pipeline too.

I will appreciate any help.

Regards,

Piotr

Piotr Wittchen
  • 3,853
  • 4
  • 26
  • 39
  • It looks strange for me when you use `SlidingProcessingTimeWindows` and assign watermarks to the events. Is it not the case to use `SlidingEventTimeWindows`? – Felipe Jun 18 '21 at 06:32
  • I'm not sure. Nevertheless, when I remove this assignment, effect is the same (process function is not invoked and there is no data on the output). – Piotr Wittchen Jun 18 '21 at 07:59
  • 1
    Writing just FYI: I ended up testing process just function instead of the whole pipeline/stream using pure java testing methods like JUnit and assertion libraries. – Piotr Wittchen Aug 08 '21 at 09:52

0 Answers0