0

I want to make a parent DAG with a few child DAGs that get called via the SubDagOperator.

  • I can only find examples how to dynamically create Subdags in the SubDagOperator task.
  • However, in this case I want standalone child DAGs that are already defined in a DAG.py file and stitch those together in a parent dag

If I set the SubDAGOperator task with just the Dag Name of the child dag:

task_1 = SubDagOperator(
    task_id="task_1",
    subdag=child_dag_name,
    dag=parent_dag
)

I get the following Error:

NameError: name 'child_dag_name' is not defined
y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
  • Does this answer your question? [Wiring top-level DAGs together](https://stackoverflow.com/questions/51325525/wiring-top-level-dags-together) – y2k-shubham May 10 '20 at 12:11
  • I already found this question. As according to your full answer here, there seems no quick and easy solution, so I might as well try the TriggerDagRunOperator option. This seems more scalable anyway. I want to trigger my first dag once, and then the second dag needs to be triggered once if all tasks of previous one succeed exactly 24h later. Is this possible with TriggerDagRunOperator? – Tim M. Schendzielorz May 11 '20 at 20:25

1 Answers1

0

This answer equally relies on knowledge of Python as much it does on having know-how of Airflow

Recall that

  • python: importing a module means that all top-level (indentation zero) stuff is immediately executed (during import process)
  • airflow: only those DAG objects are picked by scheduler / webserver that are occur on top-level (indentation zero) of dag-definition file

Keeping above 2 things in mind, here's what you can do

  • create a helper / utility function in your child_dag.py file to insantiate and return a DAG object for child-dag
  • use that helper function for instantiating the top-level child-DAG as well as for creating SubDagOperator task

dag_object_builder.py

from typing import Dict, Any

from airflow.models import DAG


def create_dag_object(dag_id: str, dag_params: Dict[str, Any]) -> DAG:
    dag: DAG = DAG(dag_id=dag_id, **dag_params)
    return dag

child_dag.py

from datetime import datetime
from typing import Dict, Any

from airflow.models import DAG

from src.main.subdag_example import dag_object_builder

default_args: Dict[str, Any] = {
    "owner": "my_owner",
    "email": ["my_username@my_domain.com"],
    "weight_rule": "downstream",
    "retries": 1
}

...


def create_child_dag_object(dag_id: str) -> DAG:
    my_dag: DAG = dag_object_builder.create_dag_object(
        dag_id=dag_id,
        dag_params={
            "start_date": datetime(year=2019, month=7, day=10, hour=21, minute=30),
            "schedule_interval": None,
            "max_active_runs": 1,
            "default_view": "graph",
            "catchup": False,
            "default_args": default_args
        }
    )
    return my_dag


my_child_dag: DAG = create_child_dag_object(dag_id="my_child_dag")

parent_dag.py

from datetime import datetime
from typing import Dict, Any

from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator

from src.main.subdag_example import child_dag
from src.main.subdag_example import dag_object_builder

default_args: Dict[str, Any] = {
    "owner": "my_owner",
    "email": ["my_username@my_domain.com"],
    "weight_rule": "downstream",
    "retries": 1
}

my_parent_dag: DAG = dag_object_builder.create_dag_object(
    dag_id="my_parent_dag",
    dag_params={
        "start_date": datetime(year=2019, month=7, day=10, hour=21, minute=30),
        "schedule_interval": None,
        "max_active_runs": 1,
        "default_view": "graph",
        "catchup": False,
        "default_args": default_args
    }
)

...

my_subdag_task: SubDagOperator = SubDagOperator(
    task_id="my_subdag_task",
    dag=my_parent_dag,
    subdag=child_dag.create_child_dag_object(dag_id="my_subdag")
)

  • If your intention is to link-up DAGs together and you don't have any particular requirement that necessitates using a SubDagOperator, then I would suggest using the TriggerDagRunOperator instead since SubDags have their share of nuisances.
  • Read more about it here: Wiring top-level DAGs together
y2k-shubham
  • 10,183
  • 11
  • 55
  • 131