I have a DAG executing a Python script which takes a date argument (the current date). I'm scheduling the DAG to run at 6:00 AM Monday through Friday i.e weekdays Eastern Standard Time. The DAG has to run the Python script on Monday with Mondays date as an argument, same for Tuesday all the way to Friday with Fridays date as an argument.
I noticed using a schedule interval of '0 6 * * 1-5'
didn't work because Fridays execution didn't occur until the following Monday.
I changed the schedule interval to '0 6 * * *'
to run everyday at 6:00 AM and at the start of my dag, filter for dates that fall within ‘0 6 * * 1-5’
, so effectively Monday to Friday. For Saturday and Sunday, the downstream tasks should be skipped.
This is my code
from __future__ import print_function
import pendulum
import logging
from airflow.models import DAG
from airflow.models import Variable
from datetime import datetime, timedelta
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
from croniter import croniter
log = logging.getLogger(__name__)
def filter_processing_date(**context):
execution_date = context['execution_date']
cron = croniter('0 6 * * 1-5', execution_date)
log.info('cron is: {}'.format(cron))
log.info('execution date is: {}'.format(execution_date))
#prev_date = cron.get_prev(datetime)
#log.info('prev_date is: {}'.format(prev_date))
return execution_date == cron.get_next(datetime).get_prev(datetime)
local_tz = pendulum.timezone("America/New_York")
# DAG parameters
default_args = {
'owner': 'Managed Services',
'depends_on_past': False,
'start_date': datetime(2020, 8, 3, tzinfo=local_tz),
'dagrun_timeout': None,
'email': Variable.get('email'),
'email_on_failure': True,
'email_on_retry': False,
'provide_context': True,
'retries': 12,
'retry_delay': timedelta(minutes=5)
}
with DAG(
'execute_python',
schedule_interval='0 6 * * *',
default_args=default_args
) as dag:
start_dummy = DummyOperator(
task_id='start',
dag=dag
)
end_dummy = DummyOperator(
task_id='end',
trigger_rule=TriggerRule.NONE_FAILED,
dag=dag
)
weekdays_only = ShortCircuitOperator(
task_id='weekdays_only',
python_callable=filter_processing_date,
dag=dag
)
run_python = SSHOperator(
ssh_conn_id="oci_connection",
task_id='run_python',
command='/usr/bin/python3 /home/sb/local/bin/runProcess.py -d {{ ds_nodash }}',
dag=dag)
start_dummy >> weekdays_only >> run_python >> end_dummy
Unfortunately, weekdays_only task is failing with the below error message. What is going wrong?
Airflow error message continuation
Airflow version: v1.10.9-composer
Python 3.