2

I have a DAG that, whenever there are files detected by FileSensor, generates tasks for each file to (1) move the file to a staging area, (2) trigger a separate DAG to process the file.

FileSensor -> Move(File1) -> TriggerDAG(File1) -> Done
          |-> Move(File2) -> TriggerDAG(File2) -^

In the DAG definition file, the middle tasks are generated by iterating over the directory that FileSensor is watching, a bit like this:

# def generate_move_task(f: Path) -> BashOperator
# def generate_dag_trigger(f: Path) -> TriggerDagRunOperator

with dag:
  for filepath in Path(WATCH_DIR).glob(*):
    sensor_task >> generate_move_task(filepath) >> generate_dag_trigger(filepath)

The Move task moves the files that lead to the task generation, so the next DAG run won't have FileSensor re-trigger either Move or TriggerDAG tasks for this file. In fact, the scheduler won't generate the tasks for this file at all, since after all files go through Move, the input directory has no contents to iterate over anymore..

This gives rise to two problems:

  1. After execution, the task logs and renderings are no longer available. The Graph View only shows the DAG as it is now (empty), not as it was at runtime. (The Tree View shows that the tasks' run and state, but clicking on the "square" and picking any details leads to an Airflow error.)
  2. The downstream tasks can be memory-holed due to a race condition. The first task is to move the originating file to a staging area. If that takes longer than the scheduler polling period, the scheduler no longer collects the downstream TriggerDAG(File1) task, which means that task is not scheduled to be executed even though the upstream task ran successfully. It's as if the downstream task never existed.

The race condition issue is solved by changing the task sequence to Copy(File1) -> TriggerDAG(File1) -> Remove(File1), but the broader problem remains: is there a way to persist dynamically generated tasks, or at least a way to consistently access them through the Airflow interface?

Simon Podhajsky
  • 307
  • 1
  • 3
  • 13

2 Answers2

3

While it isn't clear, i'm assuming that downstream DAG(s) that you trigger via your orchestrator DAG are NOT dynamically generated for each file (like your Move & TriggerDAG tasks); in other words, unlike your Move tasks that keep appearing and disappearing (based on files), the downstream DAGs are static and stay there always


You've already built a relatively complex workflow that does advanced stuff like generating tasks dynamically and triggering external DAGs. I think with slight modification to your DAGs structure, you can get rid of your troubles (which also are quite advanced IMO)

  1. Relocate the Move task(s) from your upstream orchestrator DAG to the downstream (per-file) process DAG(s)
  2. Make the upstream orchestrator DAG do two things
  3. Sense / wait for files to appear
  4. For each file, trigger the downstream processing DAG (which in effect you are already doing).

For the orchestrator DAG, you can do it either ways

  1. have a single task that does file sensing + triggering downstream DAGs for each file
  2. have two tasks (I'd prefer this)
    • first task senses files and when they appear, publishes their list in an XCOM
    • second task reads that XCOM and foreach file, triggers it's corresponding DAG

but whatever way you choose, you'll have to replicate the relevant bits of code from

  • FileSensor (to be able to sense file and then publish their names in XCOM) and
  • TriggerDagRunOperator (so as to be able to trigger multiple DAGs with single task)

here's a diagram depicting the two tasks approach

enter image description here

y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
  • 1
    Thank you for your detailed response! My concern with moving the Move task from orchestrator to process DAG is idempotence: if every process DAG starts with `mv $origin $staging`, then they are not fully re-runnable. But I think patching the list of files down from `FileSensor` (or maybe just a downstream `PythonOperator`) should easily allow me to do `Move` in the orchestrator prior to the `MultiTriggerDagRun` (which seems like the difficult part now, though [this SO answer](https://stackoverflow.com/a/51790697/2114580) should help) – Simon Podhajsky Aug 15 '20 at 00:36
  • A hacky way to still let your downstream DAGs exhibit idempotency (sort of) would be to modify your *Move* tasks to act like *Sense-Then-Move* (that is move only if file-to-be processed is NOT present at destination) – y2k-shubham Aug 15 '20 at 02:00
1

The short answer to the title question is, as of Airflow 1.10.11, no, this doesn't seem possible as stated. To render DAG/task details, the Airflow webserver always consults the DAGs and tasks as they are currently defined and collected to DagBag. If the definition changes or disappears, tough luck. The dashboard just shows the log entries in the table; it doesn't probe the logs for prior logic (nor does it seem to store much of it other than the headline).

y2k-shubham provides an excellent solution to the unspoken question of "how can I write DAGs/tasks so that the transient metadata are accessible". The subtext of his solution: convert the transient metadata into something Airflow stores per task run, but keep the tasks themselves fixed. XCom is the solution he uses here, and it does shows up in the task instance details / logs.

Will Airflow implement persistent interface access to fleeting one-time tasks whose definition disappears from the DagBag? It's possible but unlikely, for two reasons:

  1. It would require the webserver to probe the historical logs instead of just the current DagBag when rendering the dashboard, which would require extra infrastructure to keep the web interface snappy, and could make the display very confusing.
  2. As y2k-shubham notes in a comment to another question of mine, fleeting and changing tasks/DAGs are an Airflow anti-pattern. I'd imagine that would make this a tough sell as the next feature.
Simon Podhajsky
  • 307
  • 1
  • 3
  • 13