0

Here's the expected flow and dependency setting that I want to achieve:

START===>Create table list(only once when DAG is triggered)===> Read & pass table names (via XCOM) ===> Create individual tasks dynamically for each table in the list===> Print table name ===>END

Here's the sample code flow:

start = DummyOperator(
        task_id = 'start',
        dag=dag
)

end = DummyOperator(
        task_id = 'end',
        dag=dag
)

#create table list:
def create_source_table_list(dsn, uid, pwd, exclude_table_list, **kwargs):
    try:
        cnxn = pyodbc.connect('DSN={};UID={};PWD={}'.format(dsn, uid, pwd))
        cursor=cnxn.cursor()
        tables_list = []
        for row in cursor.tables():
            tables_list.append(row.table_name)
        final_list = [ele for ele in tables_list if ele not in exclude_table_list]
    return final_list

create_table_list = PythonOperator(
                task_id = 'create_table_list',
                python_callable=create_source_table_list,
                provide_context=True,
                op_args=['DSNNAME','USERID', 'PASSWORD', ['TABLE1', 'TABLE2']], 
                dag=dag
)

#function for dynamic task generation 
def createDynamicTask(task_id, callableFunction, args):
    task = PythonOperator(
        task_id = task_id,
        provide_context=True,
        python_callable = eval(callableFunction),
        op_kwargs = args,
        xcom_push = True,
        email= ['xyz.com'],
        email_on_failure = True,
        email_on_retry = False,
        dag = dag
    )
    return task

#function to print table names
def print_tables(table_name,**kwargs):
    ti = kwargs['ti']
    ls = ti.xcom_pull(task_ids='create_table_list')
    print("The table name is: ", table_name)

for table in create_source_table_list(['DSNNAME','USERID', 'PASSWORD', ['TABLE1', 'TABLE2']]):

    print_table_names=createDynamicETL('{}-dynamic_task'.format(table),'print_tables',{'table_name':str(table)})

## set dependency
start >> create_table_list
create_table_list >> [print_table_names]
print_table_names >> end

However, I am facing below issues/challenges/problem with above implementation:

  1. Table names aren't getting stored in XCOM and want to avoid storing the table list in a file on a drive.
  2. Multiple DB calls required for each table in the DB
  3. Incorrect task dependency set- it only takes and shows the dependency for the last table in the list like below: start==> create_table_list==> {LAST_TABLE_NAME in the list}==> end

Please suggest what am I doing wrong.

Thanks!

ManiK
  • 377
  • 1
  • 21
  • 1
    Your approach will very likely not work. While it's possible to dynamically create tasks, I haven't seen that you can dynamically create tasks based on xcom. The DAG Run already started when xcom will be filled, adding or removing tasks based on this sounds like a bad idea to me. – Philipp Johannis Oct 30 '20 at 13:14

1 Answers1

0

Since listing tables isn't that computationally expensive, I'd recommend to determine that list of tables outside of the DAG. Then you can dynamically generate the required tasks.

It's a bit hacky, but if you really want the tables to be listed in an Airflow operator, you could consider doing the following:

  • Create a separate (unscheduled) DAG with a task that updates the list of tables and stores it in an Airflow Variable.
  • Use the TriggerDagRunOperator to trigger your dynamic DAG, which then contains tasks based on the contents of that Variable.

P.S. I'm guessing you're using MS SQL. You could use the MsSqlOperator to query the database.

bartcode
  • 589
  • 4
  • 14