25

I have looked at the Airflow subDAG section and tried to find anything else online that would be helpful, however I have not found anything that explained in detail how to make a subDAG work. One of the requirements for a subDAG to run is that it should be enabled. How do you enable/disable a subdag?

I wrote some sample code that doesn't show any errors in airflow, however when I try to run it, none of the operators in the subDAG get executed.

This is my main dag code:

import os
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
from airflow.operators.subdag_operator import SubDagOperator
from linecount_subdag import sub_dag

parent_dag_name = 'example_linecount_dag'
child_dag_name = 'example_linecount_subdag'

args = {
    'owner': 'airflow',
    'start_date': datetime(2016, 04, 20),
    'retries': 0,
}
main_dag = DAG(
    dag_id=parent_dag_name,
    default_args=args,
    schedule_interval=timedelta(minutes=5),
    start_date=datetime(2016, 04, 20),
    max_active_runs=1
)

subdag = SubDagOperator(
    subdag=sub_dag(parent_dag_name, child_dag_name, args, main_dag.schedule_interval),
    task_id=child_dag_name,
    default_args=args,
    dag=main_dag)
t = BashOperator(
    task_id='start',
    bash_command='echo "waiting for subdag..."',
    default_args=args,
    dag=main_dag)
t.set_downstream(subdag)

In this code, the task 'start' succeeds, however the subdag task doesn't do anything and neither fails nor succeeds.

Here is my subDAG code:

from airflow.models import DAG
from airflow.operators import BashOperator

# Dag is returned by a factory method
def sub_dag(parent_dag_name, child_dag_name, args, schedule_interval):
  dag = DAG(
    '%s.%s' % (parent_dag_name, child_dag_name),
    default_args=args,
    start_date=args['start_date'],
    max_active_runs=1,
  )
  t1 = BashOperator(
    task_id='count_lines',
    bash_command='cat /root/airflow/airflow.cfg | wc -l',
    default_args=args,
    xcom_push=True,
    dag=dag)
  t2 = BashOperator(
    task_id='retrieve_val',
    bash_command='grep "airflow_home" /root/airflow/airflow.cfg',
    default_args=args,
    xcom_push=True,
    dag=dag)
  templated_command = """
    {
        echo "{{ ti.xcom_pull(task_ids='count_lines') }}"
        echo "{{ ti.xcom_pull(task_ids='retrieve_val') }}"
    }"""
  t3 = BashOperator(
    task_id='print_values',
    bash_command=templated_command,
    default_args=args,
    dag=dag)
  t3.set_upstream(t1)
  t3.set_upstream(t2)
  return dag

The 3 operators in this code get the number of lines of the file "airflow.cfg", find the value of "airflow_home" in that file, and return both of those values to be printed. This code works on its own, so I don't think it's the problem.

What do I have to change to make the subDAG execute its operators?

sophros
  • 14,672
  • 11
  • 46
  • 75
Nikita Semichev
  • 251
  • 1
  • 3
  • 7

1 Answers1

8

I used your code locally and it works fine.

The only things I changed, were setting both the outer dag, and sub dag to have schedule_interval=None and triggered them manually.

Having a start date of datetime(2016, 04, 20) and schedule_interval of 5 minutes will flood the airflow scheduler with many backfill requests.

You might need to switch from using a LocalExecutor to CeleryExecutor. LocalExecutor is fairly limited.

Here is the output from the last step in the subdag:

[2017-03-08 15:35:18,994] {base_task_runner.py:95} INFO - Subtask:       {
[2017-03-08 15:35:18,994] {base_task_runner.py:95} INFO - Subtask:           echo "226"
[2017-03-08 15:35:18,994] {base_task_runner.py:95} INFO - Subtask:           echo "airflow_home = /root/airflow/"
[2017-03-08 15:35:18,994] {base_task_runner.py:95} INFO - Subtask:       }
jhnclvr
  • 9,137
  • 5
  • 50
  • 55
  • 1
    As told by **@sage88** [here](https://stackoverflow.com/a/43122799/3679900), `Airflow 1.8+` provides a (boolean) configuration `catchup_by_default` (corresponding `DAG` param `catchup`) to override default *backfilling* behaviour – y2k-shubham Jul 04 '18 at 00:29