0

I have two different DAGs that need to run in different frequencies. One i.e. dag1 needs to run weekly and the dag2 needs to run daily. Now dag2 should only run when dag1 has finished, on every occurrence when dag1 runs.

I have defined two DAGs as follows in two different python modules.

dag1.py

PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))

with DAG('dag1',
     default_args={
         'owner': 'airflow',
         'start_date': dt.datetime(2019, 8, 19, 9, 30, 00),
         'concurrency': 1,
         'retries': 0
     }
     schedule_interval='00 10 * * 1',
     catchup=True
    ) as dag:

CRAWL_PARAMS = BashOperator(
    task_id='crawl_params',
    bash_command='cd {}/scraper && scrapy crawl crawl_params'.format(PROJECT_PATH)
)

dag2.py

PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))

with DAG('dag2',
     default_args = {
         'owner': 'airflow',
         'start_date': dt.datetime(2019, 8, 25, 9, 30, 00),
         'concurrency': 1,
         'retries': 0
     }
     schedule_interval='5 10 * * *',
     catchup=True
    ) as dag:

CRAWL_DATASET = BashOperator(
    task_id='crawl_dataset',
    bash_command='''
        cd {}/scraper && scrapy crawl crawl_dataset
    '''.format(PROJECT_PATH)
)

Currently I have manually set a gap of 5 minutes between two dags. This setup is not working currently and also lacks the function to make dag2 dependent on dag1 as required.

I had checked the answers here and here but was not able to figure out.

NOTE: the schedule_intervals are indicative only. The intention is to run dag1 every Monday at a fixed time and run dag2 daily on a fixed time and on Monday, it should only after dag1 finishes. Here each dag has multiple tasks as well.

curioswati
  • 13
  • 6

3 Answers3

0
  1. The easiest solution would be to begin your 2nd DAG with an ExternalTaskSensor that waits for completion of last task of your 1st DAG
  2. Alternatively, you can also trigger your 2nd dag at the end of the 1st dag using TriggerDagRunOperator. In this case, however, you wouldn't be able to assign a schedule_interval to 2nd dag (since it will be 'forcefully' triggered by 1st dag)
y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
  • Thanks @y2k-shubham I tried with TriggerDagRunOperator but as it is clear it won't fulfill the purpose to run `dag2` daily. I will try to figure out something with ExternalTaskSensor though. – curioswati Aug 27 '19 at 08:00
0

You could write the two tasks in the same DAG and have a downstream to set the task dependency

task1.set_downstream(task2)

As for the different task schedule dependencies, create the DAG with a daily schedule. For the task with a weekly schedule, write a shortCircuitOperator to enable a weekly trigger:

# Set trigger for first day of the week
def check_trigger_week(execution_date, **kwargs):
    return execution_date.weekday() == 0

# Task should check for the trigger to see if its first day of the week
check_trigger_weekly = ShortCircuitOperator(
  task_id='check_trigger_weekly',
  python_callable=check_trigger_week,
  provide_context=True,
  dag=dag
)

Then make your weekly task dependent on this weekly trigger

check_trigger_weekly.set_downstream(task)
Saurav Ganguli
  • 396
  • 3
  • 18
  • thanks @Saurav, this seems closer to a solution. but somehow if I use the ShortCircuitOperator and the task is skipped (because the condition didn't succeed) the dependent tasks are also skipped. which is not what I expect. since the task1 will only run weekly and the task2 will have to run daily (in instance when task1 is skipped). any clue on that? – curioswati Aug 27 '19 at 11:31
  • You could use a conditional task. So as per my first (not the best) idea, in a way write the daily task twice. Once it runs when weekly task fails and once when it passes. https://stackoverflow.com/questions/43678408/how-to-create-a-conditional-task-in-airflow – Saurav Ganguli Aug 27 '19 at 12:53
  • I instead used the trigger 'none_failed' inspired from [here](https://stackoverflow.com/a/55743947/3860168) but it doesn't work. even for that case no trigger work. I also tried 'all_done' which should let it run irrespective of the result from previous task but didn't work. – curioswati Aug 27 '19 at 14:23
  • 1
    thanks @saurav for giving me the right direction. though the ShortCircuitOperator didn't work for me due to the same issue I mentioned above regarding the task getting skipped but using the same logic I figured the solution with BranchPythonOperator. posting the solution below. thanks a lot for! – curioswati Aug 28 '19 at 10:20
0

After a lot of struggle with understanding the flow, I finally came up with the answer myself (not sure how optimal it is but works for me currently). Thanks to this answer and branching docs. Here is my solution using BranchPythonOperator.

dag1.py

import datetime as dt
from os import path

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator

PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))

DEFAULT_ARGS = {
    'owner': 'airflow',
    'start_date': dt.datetime(2019, 8, 20),
    'concurrency': 1,
    'retries': 0
}

def branch_tasks(execution_date, **kwargs):
    '''
    Branch the tasks based on weekday.
    '''
    # check if the execution day is 'Saturday'
    if execution_date.weekday() == 5:
        return ['crawl_params', 'crawl_dataset']

    return 'crawl_dataset'

with DAG('dag1',
         default_args=DEFAULT_ARGS,
         schedule_interval='00 10 * * *',
         catchup=False
        ) as dag:

    CRAWL_PARAMS = BashOperator(
        task_id='crawl_params',
        bash_command='cd {}/scraper && scrapy crawl crawl_params'.format(PROJECT_PATH)
    )

    CRAWL_DATASET = BashOperator(
        task_id='crawl_dataset',
        bash_command='cd {}/scraper && scrapy crawl crawl_dataset'.format(PROJECT_PATH),
        trigger_rule='none_failed'
    )

BRANCH_OP = BranchPythonOperator(
    task_id='branch_tasks',
    provide_context=True,
    python_callable=branch_tasks,
    dag=dag
)

BRANCH_OP.set_downstream([CRAWL_PARAMS, CRAWL_DATASET])
CRAWL_PARAMS.set_downstream(CRAWL_DATASET)

Here, the BranchPythonOperator uses the branch_tasks function to choose which tasks to run based on what day of week it is.
Another catch here is when crawl_params do run when the condition is true for it, the downstreams will also run but when it is skipped, it's downstreams will also be skipped. To avoid this, we need to pass the trigger_rule='none_failed' to the operator of the task. Which means the task should run if none of the tasks upstream have failed (they either succeeded or were skipped).

Community
  • 1
  • 1
curioswati
  • 13
  • 6