Currently I'm having an Airflow dag which is taking multiple values as arguments and planning to use them as dynamic to run the steps within the dag.
For Eg I have this method to push the values into XCom:
def push_to_xcom(ds, **kwargs):
shape_change_tables = []
ss_cd = {}
env = ''
if 'env' in kwargs['dag_run'].conf:
env = kwargs['dag_run'].conf['env']
else:
env = 'dev'
print("by default environment take as 'dev'")
if isinstance(kwargs['dag_run'].conf['ss_cd'], dict):
ss_cd = dict(kwargs['dag_run'].conf['ss_cd'])
else:
print('<<<<<<<<<<Pass sscd as an argument>>>>>>>>>>>')
sys.exit(-1)
if isinstance(kwargs['dag_run'].conf['shape'], list):
shape_change_tables = list(kwargs['dag_run'].conf['shape'])
else:
print('<<<<<<<<<<Pass shape change tables as an argument>>>>>>>>>>>')
sys.exit(-1)
kwargs['ti'].xcom_push(key='shape_change_tables', value=shape_change_tables)
kwargs['ti'].xcom_push(key='ss_cd', value=ss_cd)
kwargs['ti'].xcom_push(key='env', value=env)
I'd need to use those 3 xcom variables outside the operator within the same dag.
Let's say I'd need to use the variable shape_change_tables which is a dictionary, in the following loop:
for i in json.loads(open('somejson.json', 'r').read())['tables'].keys():
# Id want to use the above Xcom Variable shape_change_tables in the below if condition
if i in {val for dic in shape_change_tables for val in dic.values()}:
How can I pull the value? Can I simply use below before the if condition?
shape_change_tables = ti.xcom_pull(key='shape_change_tables', task_ids='push_to_xcom')
If anyone has come across the same, any help would be appreciated.