2

Before I begin I apologize since this type of question has been asked before but I am still having hard time to understand how to do the below scenario.

I started working with airflow and python coding professionally for a month now so please ignore the horribly written python function but it basically takes a string file name and return a string value back that i can use for deltas.

steps: I want to bring all files inside a bucket prefixed by ABC and iterate through them.

approch: Below is the code

#!/usr/bin/env python

"""

"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.gcs_list_operator import GoogleCloudStorageListOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators import PythonOperator
#from airflow.contrib.hooks import gcs_hook
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor
from airflow.contrib.operators.bigquery_table_delete_operator import BigQueryTableDeleteOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.contrib.operators.bigquery_operator import BigQueryCreateEmptyTableOperator
#import GenDeltaDate
from datetime import datetime
#from airflow.operators import InvalidDataFilterOperator

YESTERDAY = datetime.combine(
    datetime.today() - timedelta(days=1), datetime.min.time())
BQ_DATASET_NAME = 'Master'
CURRENT_TIME = datetime


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': YESTERDAY,
    #'email': [],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'provide_context': True,
    'dataflow_default_options': {
        'project': 'project',
        'zone': 'us-east1-f'
    }
}
files_to_process = ['abc']
bucket = 'bucket_name'


def pull(**context):
            archive(context['ti'].xcom_pull(task_ids='list_files'))

import re
def gen_delta_date(input_file,**kwargs):
    # stepl1: check for file extension and remove it
    idx_extension           = input_file.find(".")
    input_file_name         = input_file[:idx_extension]
    #check for 3 pairs of numeric values sperated by underscores and grab that value.
    find_date_time_part     = re.findall("_(\d*?_\d*?_\d*)",input_file_name)
    #massaging the value by removing unneeded char's
    find_date_time_part     = str(find_date_time_part).split('_', 1)[-1].strip(']')
    find_date_time_part     = str(find_date_time_part)
    find_date_time_part     = re.sub("'",'', find_date_time_part)
    find_date_time_part_len = len(find_date_time_part)
    '''
    to-do:
    1. need to remove hard coded length value and pass as a parameter.

    '''
    if find_date_time_part_len == 15:
        #Splitting the transformed input file name based on _ and save it into a list
        x = [a for a in find_date_time_part.split('_') if a]
        #get the date time part from the list i.e split at underscore
        x = (' '.join(x[-2:]))
        #print(x)
        #Using strptime to parse the string value as datetime object here our date format is YYYYMMDD hhmiss
        dt_obj = datetime.strptime(x, "%Y%m%d %H%M%S")
        # use strftime to format the date object into desired format in our case YYYY-MM-DD hh:mi:ss
        final_date_formatted = dt_obj.strftime("%Y-%m-%d %H:%M:%S")
        #print(type(find_date_time_part))
        return final_date_formatted
    else:
        print("Error: Input filename does not match the naming conventions:The input file naming format shoud be *xx_YYYYMMDD_hhmiss for proper parsing xx is numeric value here {0}_{1}".format(find_date_time_part_len,input_file))

with DAG('Test', default_args=default_args,
    schedule_interval=None,
) as dag:
    for item in files_to_process:
    #########################################################################
    #########################################################################
    ##############List the files in the bucket###############################
    #########################################################################
    #########################################################################
        GCS_File_list = GoogleCloudStorageListOperator(
                    task_id= 'list_files',
                    bucket= bucket,
                    prefix='ABC',
                    delimiter='.csv',
                    google_cloud_storage_conn_id='google_cloud_default',
                    #provide_context = True,
                    dag = dag
                )

        for  idx, file in enumerate(["{{ ti.xcom_pull(task_ids='list_files') }}"]):
            #print(idx)
            #print(file)
            Python_Task = PythonOperator(
                             task_id=item+'_pass_date',
                             provide_context=True,
                             python_callable=gen_delta_date,
                             op_kwargs={'input_file':file},
                             trigger_rule=TriggerRule.ALL_SUCCESS,
                             #provide_context = True,
                             #xcom_push=True,
                             dag=dag
                            )
            sql_task = BigQueryOperator(
                       task_id='query',
                       sql='test.sql',
                       destination_dataset_table='{0}.list_test'.format(BQ_DATASET_NAME),
                       bigquery_conn_id='bigquery_default',
                       use_legacy_sql=False,
                       trigger_rule=TriggerRule.ALL_SUCCESS,
                       provide_context=True,
                       create_disposition = 'CREATE_IF_NEEDED',
                       write_disposition = 'WRITE_APPEND'
                      )
#Orchestration.
GCS_File_list >> Python_Task >> sql_task

But after checking I see that the file name passed to the python function is not being templated and is being passed as string xcom.pull

after doing some research and found exactly same code and also reasons specifying why this wont work. link:[Airflow unable to iterate through xcom_pull list with GoogleCloud Operatos

In the above post it was mentioned to make use of subdags and achieve the functionality but say if i have my task GCS_File_list as a subdag how do I return the values as a list back in my main dag and then use the the list of files and then I can loop over the files to run the Python_Task and sql_task.

As for my understanding i Have to use "{{ ti.xcom_pull(task_ids='list_files') }}" inside an operator and not like what I have did in the code above(for idx, file in enumerate(["{{ ti.xcom_pull(task_ids='list_files') }}"]) then how can I store the value as a list.

Any pointers or advise much appreciated.

Thanks.

kumarm
  • 79
  • 3
  • 15

1 Answers1

0

Hi i have used a completely different approach to solve this for any one interested loop over airflow variables issue question

Regards.

kumarm
  • 79
  • 3
  • 15
  • Not the most elegant way not sure if this method is prescribed need to do lot of testing but on surface works good – kumarm Sep 12 '19 at 11:13