0

I have written an integration test in Flink 1.12.3, which tests the execute method in StreamingJob class. Surprisingly, this method outputs records to sink succesfully in production environment, but it doesn't output anything in local tests. How can I solve this and enable testing?

This may be related

    private static final DeviceIdSink deviceIdSink = new DeviceIdSink();
    
    @ClassRule
    public static MiniClusterWithClientResource flinkCluster =
            new MiniClusterWithClientResource(
                    new MiniClusterResourceConfiguration.Builder()
                            .setNumberSlotsPerTaskManager(1)
                            .setNumberTaskManagers(2)
                            .build());
    
    @Test
    public void testingAStreamingJob() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        
        List<JsonNode> events = getListFromResource("events.json");
        DataStream<JsonNode> testStream = env.fromCollection(events);

        StreamingJob job = new StreamingJob(env, Time.seconds(60),
                 testStream, deviceIdSink);


        job.execute();

        System.out.println(deviceIdSink.values);


    ```
Leukonoe
  • 649
  • 2
  • 10
  • 29

1 Answers1

0

Once the testStream source is exhausted, the job will terminate. So if you have any time-based windowing happening, you'll have pending results that never get emitted.

I use a MockSource that doesn't terminate until the cancel() method is called, e.g.

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A very simple, non-parallel source based on a list of elements. You can specify a delay
 * for between each element that is emitted.
 * 
 * @param <T>
 */
@SuppressWarnings("serial")
public class MockSource<T> implements SourceFunction<T>, ResultTypeQueryable<T>, Serializable {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(MockSource.class);
    
    private int listSize;
    private byte[] elementsSerialized;
    private TypeInformation<T> typeInfo;
    private TypeSerializer<T> serializer;
    private Time delay = null;
    
    private transient volatile boolean running;

    // Constructor for cases where you want an empty list as the source.
    public MockSource(TypeInformation<T> typeInfo) throws IOException {
        this(Collections.emptyList(), typeInfo);
    }

    @SuppressWarnings("unchecked")
    public MockSource(T... elements) throws IOException {
        this((List<T>) Arrays.asList(elements));
    }

    /**
     * Create a source from <data>, which cannot be empty (if so, use the other constructor that takes a typeInfo
     * argument.
     * 
     * @param data
     * @throws IOException
     */
    public MockSource(List<T> data) throws IOException {
        this(data, TypeExtractor.getForObject(data.get(0)));
    }

    public MockSource(List<T> data, TypeInformation<T> typeInfo) throws IOException {
        this.typeInfo = typeInfo;
        this.serializer = typeInfo.createSerializer(new ExecutionConfig());

        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);

        listSize = 0;
        try {
            for (T element : data) {
                serializer.serialize(element, wrapper);
                listSize++;
            }
        } catch (Exception e) {
            throw new IOException("Serializing the source elements failed: " + e.getMessage(), e);
        }

        this.elementsSerialized = baos.toByteArray();
    }

    public MockSource<T> setDelay(Time delay) {
        this.delay = delay;
        return this;
    }
    
    @Override
    public void run(SourceContext<T> ctx) throws Exception {
        running = true;
        Object lock = ctx.getCheckpointLock();

        ByteArrayInputStream bais = new ByteArrayInputStream(elementsSerialized);
        final DataInputView input = new DataInputViewStreamWrapper(bais);

        int i = 0;
        while (running && (i < this.listSize)) {
            T next;
            try {
                next = serializer.deserialize(input);
            } catch (Exception e) {
                throw new IOException("Failed to deserialize an element from the source. "
                        + "If you are using user-defined serialization (Value and Writable types), check the "
                        + "serialization functions.\nSerializer is " + serializer, e);
            }

            synchronized (lock) {
                ctx.collect(next);
                i++;
                
                if (delay != null) {
                    LOGGER.debug("MockSource delaying for {}ms", delay.toMilliseconds());
                    
                    Thread.sleep(delay.toMilliseconds());
                }
            }
        }
    }

    @Override
    public void cancel() {
        running = false;
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return typeInfo;
    }
}
kkrugler
  • 8,145
  • 6
  • 24
  • 18
  • Thanks @kkrugler. Is there a way to test entire StreamingJob that consists of time-based windowing that would wait until the stream is processed fully before everything terminates? – Leukonoe Jun 17 '21 at 18:58
  • You can easily create a SourceFunction that doesn't exit from the run() method until after some AtomicBoolean is set. Setting that flag depends on your business logic, of course. – kkrugler Jun 18 '21 at 18:37
  • Thank you for your suggestion. Would you be so kind and present an example? I am not that experienced and I am still not sure how to do it. – Leukonoe Jun 21 '21 at 08:43
  • 1
    Added example of testing source – kkrugler Jun 22 '21 at 17:57