1

Currently I'm having an Airflow dag which is taking multiple values as arguments and planning to use them as dynamic to run the steps within the dag.

For Eg I have this method to push the values into XCom:

def push_to_xcom(ds, **kwargs):
    shape_change_tables = []
    ss_cd = {}
    env = ''

    if 'env' in kwargs['dag_run'].conf:
        env = kwargs['dag_run'].conf['env']
    else:
        env = 'dev'
        print("by default environment take as 'dev'")
    
    if isinstance(kwargs['dag_run'].conf['ss_cd'], dict):
        ss_cd = dict(kwargs['dag_run'].conf['ss_cd'])
    else:
        print('<<<<<<<<<<Pass sscd as an argument>>>>>>>>>>>')
        sys.exit(-1)

    if isinstance(kwargs['dag_run'].conf['shape'], list):
        shape_change_tables = list(kwargs['dag_run'].conf['shape'])
    else:
        print('<<<<<<<<<<Pass shape change tables as an argument>>>>>>>>>>>')
        sys.exit(-1)
    
    kwargs['ti'].xcom_push(key='shape_change_tables', value=shape_change_tables)
    kwargs['ti'].xcom_push(key='ss_cd', value=ss_cd)
    kwargs['ti'].xcom_push(key='env', value=env)
    

I'd need to use those 3 xcom variables outside the operator within the same dag.

Let's say I'd need to use the variable shape_change_tables which is a dictionary, in the following loop:

for i in json.loads(open('somejson.json', 'r').read())['tables'].keys():
    # Id want to use the above Xcom Variable shape_change_tables in the below if condition
    if i in {val for dic in shape_change_tables for val in dic.values()}:   
        

How can I pull the value? Can I simply use below before the if condition?

shape_change_tables = ti.xcom_pull(key='shape_change_tables', task_ids='push_to_xcom')

If anyone has come across the same, any help would be appreciated.

Kulasangar
  • 9,046
  • 5
  • 51
  • 82
  • You can put the json operation into downstream PythonOperator task. Does this help? https://stackoverflow.com/a/46819184/2956135 – Emma Nov 11 '21 at 15:49

0 Answers0