I have a DAG that, whenever there are files detected by FileSensor
, generates tasks for each file to (1) move the file to a staging area, (2) trigger a separate DAG to process the file.
FileSensor -> Move(File1) -> TriggerDAG(File1) -> Done
|-> Move(File2) -> TriggerDAG(File2) -^
In the DAG definition file, the middle tasks are generated by iterating over the directory that FileSensor is watching, a bit like this:
# def generate_move_task(f: Path) -> BashOperator
# def generate_dag_trigger(f: Path) -> TriggerDagRunOperator
with dag:
for filepath in Path(WATCH_DIR).glob(*):
sensor_task >> generate_move_task(filepath) >> generate_dag_trigger(filepath)
The Move
task moves the files that lead to the task generation, so the next DAG run won't have FileSensor
re-trigger either Move
or TriggerDAG
tasks for this file. In fact, the scheduler won't generate the tasks for this file at all, since after all files go through Move
, the input directory has no contents to iterate over anymore..
This gives rise to two problems:
- After execution, the task logs and renderings are no longer available. The Graph View only shows the DAG as it is now (empty), not as it was at runtime. (The Tree View shows that the tasks' run and state, but clicking on the "square" and picking any details leads to an Airflow error.)
- The downstream tasks can be memory-holed due to a race condition. The first task is to move the originating file to a staging area. If that takes longer than the scheduler polling period, the scheduler no longer collects the downstream
TriggerDAG(File1)
task, which means that task is not scheduled to be executed even though the upstream task ran successfully. It's as if the downstream task never existed.
The race condition issue is solved by changing the task sequence to Copy(File1) -> TriggerDAG(File1) -> Remove(File1)
, but the broader problem remains: is there a way to persist dynamically generated tasks, or at least a way to consistently access them through the Airflow interface?