0

Is there a way to update airflow dag daily/periodically based on dagfile definition code? Eg. to update date values that may be used in the dag definition.

For context: I have an airflow dag that gets new table rows each day from a remote DB and moves them into a local DB. In order to get the latest rows from the remote, we have a function that gets the latest date from the local. Currently have a dag defined like...

...
def get_latest_date(tablename):
    # get latest import date from local table
    ....

for table in tables: # type list(dict)

    task_1 = BashOperator(
        task_id='task_1_%s' % table["tablename"],
        bash_command='bash %s/task_1.sh %s' % (PROJECT_HOME, table["latest_date"]),
        execution_timeout=timedelta(minutes=30),
        dag=dag)

    task_2 = BashOperator(
        task_id='task_2_%s' % table["tablename"],
        bash_command='bash %s/task_2.sh' % PROJECT_HOME,
        execution_timeout=timedelta(minutes=30),
        dag=dag)

    task_1 >> task_2

where tables are dicts where one of their fields in constructed earlier in the code to be a string rep of the latest date for a given table. When printing the supposed latest date in the task_1.sh script, finding that the date does not update each day. Need a way for the tables list be be built anew each day to have the right date values.

lampShadesDrifter
  • 3,925
  • 8
  • 40
  • 102

1 Answers1

0

Using the below code you can dynamically extract the latest_date from your local DB for each table and use that in your BashOperator using Airflow XCom.

from airflow import DAG
import airflow
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.operators.bash_operator import BashOperator
import logging

from datetime import datetime, timedelta

args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}

dag = DAG(
    dag_id='example_dag',
    default_args=args,
    schedule_interval=None,
)


def get_latest_date(**kwargs):
    # get latest import date from local table
    logging.info("Table Name: {0}".format(kwargs['table_name']))
    # below i am doing a datetime.today() for demonstration. In your function, it will be your actual logic to get the latest date from your local DB
    latest_date = (datetime.today() - timedelta(days=kwargs['date_diff'])).strftime('%d-%m-%Y')
    logging.info("Latest Date: {0}".format(latest_date))
    #pus the latest date to the task xcom
    kwargs['ti'].xcom_push(key='latest_date', value=latest_date)

    return latest_date

start_task = DummyOperator(task_id='Start_Task', dag=dag)
end_task = DummyOperator(task_id='End_Task', dag=dag)

# below list will no longer require latest_date entry in each of the table dictionary 
tables_list = [{'tablename': 'table1'}, {'tablename': 'table2'}, {'tablename': 'table3'}, {'tablename': 'table4'}]
# below i am using idx (index) for date difference. I am doing a date difference to get difference latest_date values for different tasks. This is just for demonstration purpose
for idx, table in enumerate(tables_list): # type list(dict)

    get_latest_date_task = ShortCircuitOperator(
        task_id='Get_Latest_Date_In_Table_{0}'.format(table['tablename']),
        provide_context=True,
        python_callable=get_latest_date,
        op_kwargs={
            'table_name': table['tablename'],
            'date_diff': idx
        },
        dag=dag)

    # you can create a variable xcom_str like below and use that xcom_str in BashOperator bash_command or you can directly embed that in bash_command (like I did in task_2 BashOperator)
    xcom_str = "{{ ti.xcom_pull(task_ids='Get_Latest_Date_In_Table_{}', key='latest_date') }}".format(table['tablename'])
    task_1 = BashOperator(
        task_id='task_1_{0}'.format(table['tablename']),
        bash_command='echo "{' + xcom_str + '}"',                
        execution_timeout=timedelta(minutes=30),
        dag=dag)

    task_2 = BashOperator(
        task_id='task_2_{0}'.format(table['tablename']),
        bash_command='echo "{{ ti.xcom_pull("Get_Latest_Date_In_Table_' + table['tablename'] + '", key="latest_date") }}"',
        execution_timeout=timedelta(minutes=30),
        dag=dag)

    start_task >> get_latest_date_task >> task_1 >> task_2 >> end_task
Sai Neelakantam
  • 919
  • 8
  • 15