5

I have a scenerio like a below :

  1. Trigger a Task 1 and Task 2 only when new data is avialable for them in source table ( Athena). Trigger for Task1 and Task2 should happen when a new data parition in a day.
  2. Trigger Task 3 only on the completion of Task 1 and Task 2
  3. Trigger Task 4 only the completion of Task 3

enter image description here

My code

from airflow import DAG

from airflow.contrib.sensors.aws_glue_catalog_partition_sensor import AwsGlueCatalogPartitionSensor
from datetime import datetime, timedelta

from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS

yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': yesterday,
    'email': FAILURE_EMAILS,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')

Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
    task_id='athena_wait_for_Task1_partition_exists',
    database_name='DB',
    table_name='Table1',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

Athena_Trigger_for_Task2 = AwsGlueCatalogPartitionSensor(
    task_id='athena_wait_for_Task2_partition_exists',
    database_name='DB',
    table_name='Table2',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

execute_Task1 = PostgresOperator(
    task_id='Task1',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task1.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)

execute_Task2 = PostgresOperator(
    task_id='Task2',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task2.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)



execute_Task3 = PostgresOperator(
    task_id='Task3',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task3.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)

execute_Task4 = PostgresOperator(
    task_id='Task4',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task4",
    params={'limit': '50'},
    dag=dag
)



execute_Task1.set_upstream(Athena_Trigger_for_Task1)
execute_Task2.set_upstream(Athena_Trigger_for_Task2)

execute_Task3.set_upstream(execute_Task1)
execute_Task3.set_upstream(execute_Task2)

execute_Task4.set_upstream(execute_Task3)

What is best optimal way of achieving it?

pankaj
  • 420
  • 1
  • 8
  • 26

1 Answers1

1

I believe your question addresses two major problems:

  1. forgetting to configure the schedule_interval in an explicit way so @daily is setting up something you're not expecting.
  2. How to trigger and retry properly the execution of the dag when you depend on an external event to complete the execution

the short answer: set explicitly your schedule_interval with a cron job format and use sensor operators to check from time to time

default_args={
        'retries': (endtime - starttime)*60/poke_time
}
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='0 10 * * *')
Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
     ....
    poke_time= 60*5 #<---- set a poke_time in seconds
    dag=dag)

where startime is what time your daily task will start, endtime what is the last time of the day you should check if an event was done before flagging as failed and poke_time is the interval your sensor_operator will check if the event happened.

How to address the cron job explicitly whenever you are setting your dag to @daily like you did:

dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')

from the docs, you can see you are actualy be doing: @daily - Run once a day at midnight

Which now makes sense why you're getting timeout error, and fails after 5 minutes because you set 'retries': 1 and 'retry_delay': timedelta(minutes=5). So it tries running the dag at midnight, it fails. retries again 5 minutes after and fail again, so it flag as failed.

So basically @daily run is setting an implicit cron job of:

@daily -> Run once a day at midnight -> 0 0 * * *

The cron job format is of the format below and you set the value to * whenever you want to say "all".

Minute Hour Day_of_Month Month Day_of_Week

So @daily is basicly saying run this every: minute 0 hour 0 of all days_of_month of all months of all days_of_week

So your case is run this every: minute 0 hour 10 of all days_of_month of all_months of all days_of_week. This translate in cron job format to:

0 10 * * *

How to trigger and retry properly the execution of the dag when you depend on an external event to complete the execution

  1. you could trigger a dag in airflow from an external event by using the command airflow trigger_dag. this would be possible if some how you could trigger a lambda function/ python script to target your airflow instance.

  2. If you can't trigger the dag externally, then use a sensor operator like OP did, set a poke_time to it and set a reasonable high number of retries.

  • Thanks for this. Also if I want trigger the tasks based on event rather than time i.e as soon new data partition is avialable in the source ` AWS Athena Tables` next task should triggered. Then how do I schedule. Is my current code apt enough. – pankaj Apr 22 '20 at 04:44
  • @pankaj, I see only two alternatives. I don't know much about aws athena, but you could trigger a dag in airflow from an external event by using the command `airflow trigger_dag`. this would be possible if some how you could trigger a lambda function/ python script to target your airflow instance. – Bernardo stearns reisen Apr 22 '20 at 04:55
  • the other alternative is more or less what you are doing, because you don't have an event based trigger you need to periodically check if this event happened. So a using this current solution would be set a cron job for a range of hours run the dag in a high frequency of minutes... many will fail but it will be able to catch fairly quickly after the event happens – Bernardo stearns reisen Apr 22 '20 at 04:58
  • @Bernado, I have figured out package in Airflow called `AwsGlueCatalogPartitionSensor` along along with airflow command `{{ds_nodash}}` for the partition exits. My question then how to schedule this . – pankaj Apr 22 '20 at 04:59
  • @Benado, Can you have look at my code where I have implemented above mentioned check and give your inputs – pankaj Apr 22 '20 at 05:00
  • @pankaj , you can see from the docs https://airflow.apache.org/docs/1.10.6/_api/airflow/contrib/sensors/aws_glue_catalog_partition_sensor/index.html – Bernardo stearns reisen Apr 22 '20 at 05:01
  • you have an parameter `poke_interval (int) – Time in seconds that the job should wait in between each tries` – Bernardo stearns reisen Apr 22 '20 at 05:02
  • @pankaj let's move the discussion to the chat – Bernardo stearns reisen Apr 22 '20 at 05:03
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/212228/discussion-between-bernardo-stearns-reisen-and-pankaj). – Bernardo stearns reisen Apr 22 '20 at 05:03