25

I have a python DAG Parent Job and DAG Child Job. The tasks in the Child Job should be triggered on the successful completion of the Parent Job tasks which are run daily. How can add external job trigger ?

MY CODE

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS

yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': yesterday,
    'email': FAILURE_EMAILS,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('Child Job', default_args=default_args, schedule_interval='@daily')

execute_notebook = PostgresOperator(
  task_id='data_sql',
  postgres_conn_id='REDSHIFT_CONN',
  sql="SELECT * FROM athena_rs.shipments limit 5",
  dag=dag
)
aeapen
  • 871
  • 1
  • 14
  • 28
  • Does this answer your question? [How to set dependencies between DAGs in Airflow?](https://stackoverflow.com/questions/38022323/how-to-set-dependencies-between-dags-in-airflow) – moon Apr 30 '20 at 05:45
  • @LuckyGuess The example shows one task another dag triggers another another task in another task. Here what I think what he looking, completion of one DAG completly triggering the next DAG. If you could show an example it would great. – pankaj Apr 30 '20 at 06:32
  • 2
    I'd highly recommend using [`TriggerDagRunOperator`](https://stackoverflow.com/a/43459863/3679900) to perform **reactive triggering** rather than `ExternalTaskSensor` to perform **poll-based triggering** – y2k-shubham May 01 '20 at 20:35
  • @y2k-shubham, If you could write an example as the ones written below, it will be learning for others as well. I am also facing same issue. – pankaj May 02 '20 at 05:01
  • 1
    **@pankaj** I've added an answer depicting usage of `TriggerDagRunOperator` – y2k-shubham May 02 '20 at 20:29

3 Answers3

34

Answer is in this thread already. Below is demo code:

Parent dag:

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 4, 29),
}

dag = DAG('Parent_dag', default_args=default_args, schedule_interval='@daily')

leave_work = DummyOperator(
    task_id='leave_work',
    dag=dag,
)
cook_dinner = DummyOperator(
    task_id='cook_dinner',
    dag=dag,
)

leave_work >> cook_dinner

Child dag:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensor

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 4, 29),
}

dag = DAG('Child_dag', default_args=default_args, schedule_interval='@daily')

# Use ExternalTaskSensor to listen to the Parent_dag and cook_dinner task
# when cook_dinner is finished, Child_dag will be triggered
wait_for_dinner = ExternalTaskSensor(
    task_id='wait_for_dinner',
    external_dag_id='Parent_dag',
    external_task_id='cook_dinner',
    start_date=datetime(2020, 4, 29),
    execution_delta=timedelta(hours=1),
    timeout=3600,
)

have_dinner = DummyOperator(
    task_id='have_dinner',
    dag=dag,
)
play_with_food = DummyOperator(
    task_id='play_with_food',
    dag=dag,
)

wait_for_dinner >> have_dinner
wait_for_dinner >> play_with_food

Images:

Dags

Dags

Parent_dag

Parent_dag

Child_dag

Child_dag

moon
  • 1,702
  • 3
  • 19
  • 35
  • I tried this I am getting an timeout error.`[MainThread] INFO airflow.task.operators - [2020-05-01 09:51:14,444] {{external_task_sensor.py:115}} Poking for RS_Input_Cleansing.events_input_sql on 2020-04-29T23:00:00+00:00 ... [MainThread] ERROR airflow.task - [2020-05-01 09:51:14,508] {{taskinstance.py:1088}} Snap. Time is OUT. Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 955, in _run_raw_task result = task_copy.execute(context=context)` – aeapen May 01 '20 at 11:58
  • If the `Parent Dag` is already in success status. If trigger the `Child Dag` manually will run? – aeapen May 01 '20 at 12:54
  • You probably need to open an another question and post the full log. But to answer your question, yes, if `RS_Input_Cleansing.events_input_sql` is finished on time, child dag will run automatically – moon May 01 '20 at 16:03
  • Last Task of parent dag took 2 hours to complete. Is that reason for timeout because we have given 1 Hr for timeout – aeapen May 01 '20 at 16:28
  • yea, increase the `timeout`. Now it's set to 3600 sec which is 1 hour. – moon May 01 '20 at 17:08
  • Be aware that I also set `execution_delta` and `start_date`, you should probably remove them if you don't need them. – moon May 01 '20 at 17:12
  • @LuckyGuess, Can you have look at this question https://stackoverflow.com/questions/61555430/how-to-do-store-sql-output-to-pandas-dataframe-using-airflow/61562437#61562437. I really liked your explaination style with example – Ria Alves May 03 '20 at 04:35
  • This will not work if the Child Dag knows nothing about Parent Dag. A better solution would be to make it more event driven – Spencer Sutton Dec 17 '20 at 17:35
10

As requested by @pankaj, I'm hereby adding a snippet depicting reactive-triggering using TriggerDagRunOperator (as opposed to poll-based triggering of ExternalTaskSensor)

from typing import List

from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule

# DAG object
my_dag: DAG = DAG(dag_id='my_dag',
                  start_date=..)
..
# a list of 'tail' tasks: tasks that have no downstream tasks
tail_tasks_of_first_dag: List[BaseOperator] = my_magic_function_that_determines_all_tail_tasks(..)
..

# our trigger task
my_trigger_task: TriggerDagRunOperator = TriggerDagRunOperator(dag=my_dag,
                                                               task_id='my_trigger_task',
                                                               trigger_rule=TriggerRule.ALL_SUCCESS,
                                                               external_dag_id='id_of_dag_to_be_triggered')
# our trigger task should run when all 'tail' tasks have completed / succeeded
tail_tasks_of_first_dag >> my_trigger_task

Note that snippet is for reference purpose only; it has NOT been tested


Points to note / References

y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
  • It worked . It my child ran on the success on parent . I still have a doubt.The dag of my child is `dag = DAG('Child', default_args=default_args, catchup=False, schedule_interval='@daily')` . My parent DAG is scheduled to run at 8:30 AM . The child job run after the Parent DAG finishes after 8:30 AM run and also it runs again at 12 :00 AM. I am missing something in the DAG. – aeapen May 03 '20 at 05:13
  • 2
    **@aeapen** you might want to set `schedule_interval` of your child DAG to `None`. That ways it wouldn't ever be run automatically by Airflow; and would only be triggered upon completion of parent DAG – y2k-shubham May 03 '20 at 05:19
4

I believe you are looking for SubDags operator, running a Dag in a bigger dag. Note that creating many subdags like in the example below gets messy pretty quick, so I recommend splitting each subdag in a file and importing then in a main file.

The SubDagOperator is simple to use you need to give an Id, a subdag (the child) and a dag(the parent)

subdag_2 = SubDagOperator(
        task_id="just_some_id", 
        subdag=child_subdag, <---- this must be a DAG
        dag=parent_dag, <----- this must be a DAG
        )

It will look like this: this

From their examples repo

from airflow import DAG
from airflow.example_dags.subdags.subdag import subdag
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.utils.dates import days_ago
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
def subdag(parent_dag_name, child_dag_name, args):
    dag_subdag = DAG(
            dag_id='%s.%s' % (parent_dag_name, child_dag_name),
            default_args=args,
            schedule_interval="@daily",
            )

    for i in range(5):
        DummyOperator(
                task_id='%s-task-%s' % (child_dag_name, i + 1),
                default_args=args,
                dag=dag_subdag,
                )

    return dag_subdag

DAG_NAME = 'example_subdag_operator'

args = {
        'owner': 'airflow',
        'start_date': days_ago(2),
        }

dag = DAG(
        dag_id=DAG_NAME,
        default_args=args,
        schedule_interval="@once",
        tags=['example']
        )

start = DummyOperator(
        task_id='start-of-main-job',
        dag=dag,
        )

some_other_task = DummyOperator(
        task_id='some-other-task',
        dag=dag,
        )


end = DummyOperator(
        task_id='end-of-main-job',
        dag=dag,
        )

subdag = SubDagOperator(
        task_id='run-this-dag-after-previous-steps',
        subdag=subdag(DAG_NAME, 'run-this-dag-after-previous-steps', args),
        dag=dag,
        )

start >> some_other_task >> end >> subdag
  • Thanks for this. I am looking for another case. Probably I might have confused . Parent Dag is another job basically and Child Dag' is another job. I am want trigger Child' after the successful completion of the first job. If you could help me on that , it would be nice – aeapen Apr 30 '20 at 12:47
  • @aeapen what do you mean by `job` is it a task, a DAG ? This code is doing exactly what you are describing... The only thing you would do to match your case is add the Parent Dag in the end of this pipeline. I'll update the dag to demonstrate it... But the mindset is instead of having two different DAGs independently you have nested DAGS, Parent -> Child... this makes more sense to me – Bernardo stearns reisen Apr 30 '20 at 12:57
  • the other approach would be triggering one DAG from another DAG using ExternalTaskSensor operator, which is much more confusing in my opinion – Bernardo stearns reisen Apr 30 '20 at 12:58
  • @aeapen I updated my solution, does this suits more what you want ? – Bernardo stearns reisen Apr 30 '20 at 13:08
  • I mean Job is A DAG,which contains multiple tasks. – aeapen Apr 30 '20 at 14:45
  • So do I need use SubDagOperator or ExternalDagSensor for this purpose? – aeapen Apr 30 '20 at 14:46
  • You can potentially do what you want with any of them, I suggest going with subdags... it's much more maintainable – Bernardo stearns reisen Apr 30 '20 at 14:55
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/212872/discussion-between-aeapen-and-bernardo-stearns-reisen). – aeapen Apr 30 '20 at 15:27
  • I don't need to add `start, someother tasks and end`. You added them just for illustration right – aeapen Apr 30 '20 at 15:29
  • @aeapen yes... I put them to ilustrate what the Parent DAG does before you call the Child DAG. – Bernardo stearns reisen Apr 30 '20 at 15:32
  • I M kind of confused,Can you what for loop in Dummyoperator does . – aeapen Apr 30 '20 at 15:39
  • It just creates 5 operators inside of the subdag... I suggest copying and pasting this code in a dag file and check in airflow – Bernardo stearns reisen Apr 30 '20 at 15:40
  • you can click on the subdag and check the subdag tasks in the airflow ui – Bernardo stearns reisen Apr 30 '20 at 15:41
  • If the` Parent Dag' is already in success status. If trigger the 'Child Dag' manually will it run? . Here I am external sensor method – aeapen May 01 '20 at 12:31
  • @aeapen if you add subdag inside the parentdag it will only flag the parent dag as sucess when the subdag finishes... I think you still confused about some major concepets in airflow, if you want describe your scenario for me in the chat – Bernardo stearns reisen May 01 '20 at 15:25
  • @Bernardostearnsreisen, Can you have look at https://stackoverflow.com/questions/61555430/how-to-do-store-sql-output-to-pandas-dataframe-using-airflow/61562437#61562437. I really liked your explaination with example here – Ria Alves May 03 '20 at 04:36