10

I have an upload folder that gets irregular uploads. For each uploaded file, I want to spawn a DAG that is specific to that file.

My first thought was to do this with a FileSensor that monitors the upload folder and, conditional on presence of new files, triggers a task that creates the separate DAGs. Conceptually:

Sensor_DAG (FileSensor -> CreateDAGTask)

|-> File1_DAG (Task1 -> Task2 -> ...)
|-> File2_DAG (Task1 -> Task2 -> ...)

In my initial implementation, CreateDAGTask was a PythonOperator that created DAG globals, by placing them in the global namespace (see this SO answer), like so:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from datetime import datetime, timedelta
from pathlib import Path

UPLOAD_LOCATION = "/opt/files/uploaded"

# Dynamic DAG generation task code, for the Sensor_DAG below
def generate_dags_for_files(location=UPLOAD_LOCATION, **kwargs):
    dags = []
    for filepath in Path(location).glob('*'):
        dag_name = f"process_{filepath.name}"
        dag = DAG(dag_name, schedule_interval="@once", default_args={
            "depends_on_past": True,
            "start_date": datetime(2020, 7, 15),
            "retries": 1,
            "retry_delay": timedelta(hours=12)
        }, catchup=False)
        dag_task = DummyOperator(dag=dag, task_id=f"start_{dag_name}")

        dags.append(dag)

        # Try to place the DAG into globals(), which doesn't work
        globals()[dag_name] = dag

    return dags

The main DAG then invokes this logic via a PythonOperator:

# File-sensing DAG
default_args = {
    "depends_on_past" : False,
    "start_date"      : datetime(2020, 7, 16),
    "retries"         : 1,
    "retry_delay"     : timedelta(hours=5),
}
with DAG("Sensor_DAG", default_args=default_args,
         schedule_interval= "50 * * * *", catchup=False, ) as sensor_dag:

    start_task  = DummyOperator(task_id="start")
    stop_task   = DummyOperator(task_id="stop")
    sensor_task = FileSensor(task_id="my_file_sensor_task",
                             poke_interval=60,
                             filepath=UPLOAD_LOCATION)
    process_creator_task = PythonOperator(
        task_id="process_creator",
        python_callable=generate_dags_for_files,
    )
    start_task >> sensor_task >> process_creator_task >> stop_task

But that doesn't work, because by the time process_creator_task runs, the globals have already been parsed by Airflow. New globals after parse time are irrelevant.

Interim solution

Per Airflow dynamic DAG and task Ids, I can achieve what I'm trying to do by omitting the FileSensor task altogether and just letting Airflow generate the per-file task at each scheduler heartbeat, replacing the Sensor_DAG with just executing generate_dags_for_files: Update: Nevermind -- while this does create a DAG in the dashboard, actual execution runs into the "DAG seems to be missing" issue:

generate_dags_for_files()

This does mean that I can no longer regulate the frequency of folder polling with the poke_interval parameter of FileSensor; instead, Airflow will poll the folder every time it collects DAGs.

Is that the best pattern here?

Other related StackOverflow threads

y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
Simon Podhajsky
  • 307
  • 1
  • 3
  • 13
  • https://www.astronomer.io/guides/dynamically-generating-dags/ – drum Jul 17 '20 at 22:27
  • @drum I re-read that article; While they do generate DAGs dynamically at parse time, I don't believe any of the examples have an **Airflow task** create a DAG dynamically. – Simon Podhajsky Jul 17 '20 at 22:40
  • **@Simon Podhajsky** since python is an interpreter-based language, this can certainly be achieved via **meta-programming**: if your `task` programmatically generates and writes a valid DAG file in Airflow's DAG directory, within a few minutes they will be picked up and will show up on UI. Now it really doesn't have to be as hard as generating `python` code: if you write a python script (DAG-definition file) that generates DAGs (with pre-defined structure / tasks) based on JSON input read from a `Variable`, then just updating `Variable` via your Airflow task would produce new DAG. – y2k-shubham Jul 18 '20 at 19:46
  • I think [this answer](https://stackoverflow.com/a/55896121/3679900) comes closest to what you want: he isn't generating new DAG at runtime; but he is adding new downstream `task`s to the DAG at runtime (upstream task updates a `Variable` and causing new downstream tasks to appear) – y2k-shubham Jul 18 '20 at 19:51
  • 1
    but a word of caution here: based on my experience with Airflow (but over and above building and maintaining an ETL-platform), I'd recommend to stay away from this *magic* as much as possible. IMO `DAG`s should be immutable & long-lasting / predictable: **[i]** DAGs shouldn't magically appear and disappear and **[ii]** even worse, `task`s in a DAG shouldn't appear and vanish. Or else, your entire book-keeping (history of DAGs, tasks) would go for a toss and observability / debuggability of system will be compromised – y2k-shubham Jul 18 '20 at 19:52

2 Answers2

4

In short: if the task writes where the DagBag reads from, yes, but it's best to avoid a pattern that requires this. Any DAG you're tempted to custom-create in a task should probably instead be a static, heavily parametrized, conditionally-triggered DAG. y2k-shubham provides an excellent example of such a setup, and I'm grateful for his guidance in the comments on this question.

That said, here are the approaches that would accomplish what the question is asking, no matter how bad of an idea it is, in the increasing degree of ham-handedness:

  • If you dynamically generate DAGs from a Variable (like so), modify the Variable.
  • If you dynamically generate DAGs from a list of config files, add a new config file to wherever you're pulling config files from, so that a new DAG gets generated on the next DAG collection.
  • Use something like Jinja templating to write a new Python file in the dags/ folder.

To retain access to the task after it runs, you'd have to keep the new DAG definition stable and accessible on future dashboard updates / DagBag collection. Otherwise, the Airflow dashboard won't be able to render much about it.

Simon Podhajsky
  • 307
  • 1
  • 3
  • 13
  • 2
    To anyone reading this, the answer above is wrong about Dynamic DAGs being a bad idea. Hear from the inventor of Airflow instead: https://www.youtube.com/watch?v=Fvu2oFyFCT0 – Yiannis Nov 29 '21 at 16:23
0

Airflow is suited for building DAGs dynamically; as pointed it out by its creator: https://youtu.be/Fvu2oFyFCT0?t=411 p.s. thanks to @Yiannis for the video reference

Here is an example of how this could be accomplished: https://docs.astronomer.io/learn/dynamically-generating-dags

Neil
  • 7,482
  • 6
  • 50
  • 56