I am trying to create a DAG in Apache Airflow that performs two tasks: Task 1 pulls data from a Google Sheet, cleans the data, and returns a pandas DataFrame. Task 2 takes this DataFrame and pushes the data to a BigQuery table. I have set up the tasks and dependencies in my DAG, but Task 2 is unable to pull the data from Task 1 using XCom.
I have written the code to push the data from Task 1 to XCom, and the code to pull the data from XCom in Task 2. However, Task 2 is unable to pull the data and the value of the variable df_json is None.
I have checked that the key for XCom is the same in both tasks and that the DataFrame is being correctly converted to a JSON string in Task 1. I have also checked that both tasks have provide_context=True set in the PythonOperator.
I have included my code below for reference. Can someone help me figure out why Task 2 is unable to pull the data from Task 1 and what I can do to fix this issue?
# Import the necessary modules
# Code Details are here
# create a new dataframe with the same values as the original dataframe
df = filtered_df.copy()
year = datetime.today().year
# convert Day column to string type
df['day'] = df['day'].astype(str)
# extract only the numeric part of the string
df['day'] = df['day'].str.extract('(\d+)')
# create a new column with the month number
df['month_num'] = pd.to_datetime(df['month'], format='%B').dt.month
# create a temporary date column
df['date_temp'] = pd.to_datetime(df['month_num'].astype(
str) + '/' + df['day'].astype(str) + '/' + str(year), format='%m/%d/%Y', errors='coerce')
# check for invalid day values
invalid_days = df.loc[df['date_temp'].isnull(), 'day']
# print the invalid day values
print('Invalid day values:', invalid_days.tolist())
# remove rows with invalid day values
df = df.drop(df.loc[df['date_temp'].isnull()].index)
# create a new column with the date
df['date'] = pd.to_datetime(df['month_num'].astype(
str) + '/' + df['day'].astype(str) + '/' + str(year), format='%m/%d/%Y')
# drop the intermediate columns
df.drop(columns=[ 'date_temp'], inplace=True)
# convert datetime objects to date objects
# df['Date'] = df['Date'].apply(lambda x: x.date())
df['date'] = df['date'].dt.date
# display cleaned dataframe in a Tabular format
print(tabulate(df, headers='keys', tablefmt='mixed_grid'))
# Save the DataFrame as a JSON string
df_json = df.to_json(orient='split', index=False)
# Push the JSON string to XCom
kwargs['ti'].xcom_push(key='transformed_data', value=df_json)
#return df
def push_data_to_bigquery(**kwargs):
# Pull the JSON string from XCom
df_json = kwargs['ti'].xcom_pull(key='transformed_data')
if df_json is not None:
# Convert the JSON string back to a DataFrame
df = pd.read_json(df_json, orient='split')
print(df)
# code to write the data to BigQuery
# df = df.where(pd.notnull(df), None)
# Write the DataFrame to a BigQuery table
# rows_to_insert = cleaned_df.to_dict(orient='records')
# client.insert_rows(table, rows_to_insert)
gbq.to_gbq(df, destination_table=bigquery_table_name,
project_id=project_id, if_exists='replace')
else:
raise ValueError("No data found in XCom for key 'transformed_data'.")
#Create your `PythonOperator` tasks and set the task dependencies:
pull_data_task = PythonOperator(
task_id='pull_data_from_gsheet',
python_callable=pull_data_from_gsheet,
provide_context=True,
dag=dag,
)
push_data_task = PythonOperator(
task_id='push_data_to_bigquery',
python_callable=push_data_to_bigquery,
provide_context=True,
dag=dag,
)
push_data_task.set_downstream(pull_data_task)
Note: I am running the code in my local system for now
I am aware about this as well - " XCom is not designed for passing large amounts of data, so if your DataFrame is very large, I might need to consider using an intermediary storage system (e.g., Google Cloud Storage, S3, etc.) to pass data between tasks."
Task 01 is giving me the output when I am running locally airflow tasks test Gsheet_to_bq pull_data_from_gsheet 2023-03-19 Task 01:enter image description here task 02: enter image description here
Specifically, I am trying to push the transformed data from Task 1 to XCom and then pull the transformed data from XCom in Task 2 to write it to a BigQuery table. However, I am facing some issues with the data not flowing properly between the tasks.
I have tried several approaches to resolve this issue, including ensuring that the XCom key and value are properly specified in both tasks, and ensuring that the JSON string conversion is done properly before pushing the data to XCom. However, I am still unable to get the data to flow properly between the tasks.
Could you please provide me with some guidance or advice on how to resolve this issue? I would greatly appreciate any suggestions or solutions you may have.
Thank you very much for your time and assistance.