1

The Flink Job is running in the workflow on the GCP DataProc cluster. A timeout is set for the workflow task, and the Dag is forcibly terminated after the set time.

At this time, the close and cancel functions of Flink RichParallelSourceFunction do not work.

public class SampleSource extends RichParallelSourceFunction<Data> {

    private static final Logger logger = LoggerFactory.getLogger(SampleSource.class);
    private volatile boolean running;
    SourceFunction sf;

    @Override
    public void open(Configuration parameters) throws Exception {
        running = true;
        sf = new SourceFunction();
        sf.init();
    }

    @Override
    public void run(SourceContext<Data> ctx) throws Exception {
        while (running) {
            List<Data> data = sf.getData();
            data.forEach(ctx::collect);
            Thread.sleep(1000*10);
        }
    }

    @Override
    public void cancel() {
        try {
            logger.info("[Job Cancelled]");
            sf.updateTimestamp();
            this.close();
        } catch (Exception e) {
            logger.error(e.getMessage());
        }
    }

    @Override
    public void close() throws Exception {
        try {
            logger.info("[Job Closed]");
            running = false;
            sf.close();
            super.close();
        } catch (Exception e) {
            logger.error(e.getMessage());
        } finally {
            sf.updateTimestamp();
        }
    }
}

And when I checked the log, the close and cancel functions don't seem to work. Only TaskExecutorStateChangelogStoragesManager work.

2023-06-08 08:03:21,169 INFO state.TaskExecutorStateChangelogStoragesManager: Shutting down TaskExecutorStateChangelogStoragesManager.
2023-06-08 08:03:21,170 INFO state.TaskExecutorLocalStateStoresManager: Shutting down TaskExecutorLocalStateStoresManager.
2023-06-08 08:03:21,169 INFO blob.PermanentBlobCache: Shutting down BLOB cache
2023-06-08 08:03:21,170 INFO blob.TransientBlobCache: Shutting down BLOB cache
2023-06-08 08:03:21,170 INFO filecache.FileCache: removed file cache directory /tmp/flink-dist-cache-...
2023-06-08 08:03:21,173 INFO disk.FileChannelManagerImpl: FileChannelManager removed spill file directory /tmp/flink-netty-shuffle-...

I want to trigger the last sf.close(). How can I do it?

염경훈
  • 73
  • 1
  • 6

0 Answers0