I am having hard time looping over an airflow variable in my script
so I have a requirement to list all files prefixed by string in a bucket.
next loop through it and do some operations.
I tried making use of xcomm and subdags but i couldn't figure it out so i came up with a new approach.
it involves 2 scripts though
1 st scripts sets the airflow variable with a value i generate
below is the code.
#!/usr/bin/env python
with DAG('Test_variable', default_args=default_args,
schedule_interval=None
) as dag:
GCS_File_list = GoogleCloudStorageListOperator(
task_id= 'list_Files',
bucket= 'bucketname',
prefix='aaa',
delimiter='.csv',
google_cloud_storage_conn_id='google_cloud_default'
#provide_context = True,
#dag = dag
)
def update_variable(**context):
files_list = Variable.get('files_list')
print(type(Variable.get('files_list')))
updated_file_list = context['ti'].xcom_pull(task_ids='list_Files')
#updated_file_list = updated_file_list.strip('][').split(',')
Variable.set("files_list", updated_file_list)
#print(updated_file_list)
#print(type(updated_file_list))
python_task = PythonOperator(
task_id= 'pass_list',
provide_context=True,
python_callable=update_variable,
#op_kwargs={'input_file':file},
trigger_rule=TriggerRule.ALL_SUCCESS,
#provide_context = True,
#xcom_push=True,
dag=dag
)
GCS_File_list >> python_task
as you see the above script lists files from a bucket and set the result to an airflow variable.
script 2: import the variable value set in script 1 and loop over it
this is where i can get the variable value but cant iterate over it
script 2:
#!/usr/bin/env python
from datetime import datetime, timedelta
from airflow import DAG
from airflowag.models import Variable
from airflow.operators.bash_operator import BashOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators import PythonOperator
from datetime import datetime
YESTERDAY = datetime.combine(
datetime.today() - timedelta(days=1), datetime.min.time())
BQ_DATASET_NAME = 'abc'
CURRENT_TIME = datetime
files_list_str = Variable.get("files_list")
files_list = files_list_str.strip('][').split(',')
bucket = 'def'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': YESTERDAY,
'provide_context': True,
}
def gen_date(input_file,**kwargs):
# stepl1: check for file extension and remove it
idx_extension = input_file.find(".")
input_file_name = input_file[:idx_extension]
find_date_time_part = re.findall("_(\d*?_\d*?_\d*)",input_file_name)
find_date_time_part = str(find_date_time_part).split('_', 1)[-1].strip(']')
find_date_time_part = str(find_date_time_part)
find_date_time_part = re.sub("'",'', find_date_time_part)
find_date_time_part_len = len(find_date_time_part)
if find_date_time_part_len == 15:
x = [a for a in find_date_time_part.split('_') if a]
#get the date time part from the list
x = (' '.join(x[-2:]))
#Using strptime to parse the string value as datetime object here our date format is YYYYMMDD hhmiss
dt_obj = datetime.strptime(x, "%Y%m%d %H%M%S")
# use strftime to format the date object into desired format in our case YYYY-MM-DD hh:mi:ss
final_date_formatted = dt_obj.strftime("%Y-%m-%d %H:%M:%S")
#print(type(find_date_time_part))
return final_date_formatted
else:
print("{}_{}".format(files_list,input_file))
with DAG('dag1', default_args=default_args,
schedule_interval=None
) as dag:
for file in enumerate (files_list):
Python_Task = PythonOperator(
task_id='pass_date',
provide_context=True,
python_callable=gen_date,
op_kwargs={'input_file':file},
trigger_rule=TriggerRule.ALL_SUCCESS,
provide_context = True,
#xcom_push=True,
dag=dag
)
Python_Task
as you see in this function gen_date() i am printing out the variable name as well as the input file name in the else block
the output of the statement print("{}_{}".format(files_list,input_file) is ['abc.csv','def.csv']_[
I am not sure why a "[" is getting passed instead of input_file name any advise appreciated.
i see now that in for loop file_list is being treated as string and not a list
how can i make the files_list as a list and not string.