I have a task where I save a file that has the format 'account_{year}_{month}.csv to a Google Cloud storage - I am using PythonOperator. For example, this month file is called account_2023_06. Next month, it will be named 'account_2023_07'.
Then, in the next step, I am supposed to retrieve that newly uploaded file from the Cloud Storage and save it as a BigQuery table:
dataset_CS_BQ = GoogleCloudStorageToBigQueryOperator(
task_id = "dataset_CS_BQ",
bucket = BUCKET_accounts_historical,
source_objects =[....csv]
My challenge is that I cannot figure out what source_objects to use in source_objects as the file name will change every month. The options I could think of are not to add {year} and {month} when naming the file OR maybe use extract the latest modified file in the folder in the bucket and then somehow pass the file name, but it will be over-simplification as it is desirable to have yyyy_mm as a part of the file name.
Can you, please, re-direct me to some reading/advise me on how I could complete it?
Update: Resolved by passing the file name to:
def accounts(**kwargs):
auth = (kwargs['username'], kwargs['password'])
ti = kwargs['ti']
...
#Instantiate cloud storage
client = storage.Client()
bucket = client.get_bucket(BUCKET_accounts_historical_sl)
#Saving to GCS
path = "/accounts/"+ 'account_'+str(yr)+'_'+str(mnth)+'.csv'
ti.xcom_push(key = 'path' , value = path)
Then, added provide_context to the task itself:
download_dataset_task = PythonOperator(
task_id="download_dataset_task",
python_callable=accounts,
provide_context=True,
)
Finally, retrieved the stored xcom value with:
dataset_CS_BQ = GoogleCloudStorageToBigQueryOperator(
task_id = "dataset_CS_BQ",
bucket = BUCKET_accounts_historical,
source_objects =["{{ ti.xcom_pull(key='path') }}"],