0

I am trying to create a DAG in Apache Airflow that performs two tasks: Task 1 pulls data from a Google Sheet, cleans the data, and returns a pandas DataFrame. Task 2 takes this DataFrame and pushes the data to a BigQuery table. I have set up the tasks and dependencies in my DAG, but Task 2 is unable to pull the data from Task 1 using XCom.

I have written the code to push the data from Task 1 to XCom, and the code to pull the data from XCom in Task 2. However, Task 2 is unable to pull the data and the value of the variable df_json is None.

I have checked that the key for XCom is the same in both tasks and that the DataFrame is being correctly converted to a JSON string in Task 1. I have also checked that both tasks have provide_context=True set in the PythonOperator.

I have included my code below for reference. Can someone help me figure out why Task 2 is unable to pull the data from Task 1 and what I can do to fix this issue?

# Import the necessary modules
# Code Details are here
# create a new dataframe with the same values as the original dataframe
    df = filtered_df.copy()

    year = datetime.today().year

    # convert Day column to string type
    df['day'] = df['day'].astype(str)

    # extract only the numeric part of the string
    df['day'] = df['day'].str.extract('(\d+)')

    # create a new column with the month number
    df['month_num'] = pd.to_datetime(df['month'], format='%B').dt.month

    # create a temporary date column
    df['date_temp'] = pd.to_datetime(df['month_num'].astype(
        str) + '/' + df['day'].astype(str) + '/' + str(year), format='%m/%d/%Y', errors='coerce')

    # check for invalid day values
    invalid_days = df.loc[df['date_temp'].isnull(), 'day']

    # print the invalid day values
    print('Invalid day values:', invalid_days.tolist())

    # remove rows with invalid day values
    df = df.drop(df.loc[df['date_temp'].isnull()].index)

    # create a new column with the date
    df['date'] = pd.to_datetime(df['month_num'].astype(
        str) + '/' + df['day'].astype(str) + '/' + str(year), format='%m/%d/%Y')


# drop the intermediate columns
    df.drop(columns=[ 'date_temp'], inplace=True)

# convert datetime objects to date objects
    # df['Date'] = df['Date'].apply(lambda x: x.date())
    df['date'] = df['date'].dt.date

# display cleaned dataframe in a Tabular format

    print(tabulate(df, headers='keys', tablefmt='mixed_grid'))

    # Save the DataFrame as a JSON string
    df_json = df.to_json(orient='split', index=False)

    # Push the JSON string to XCom
    kwargs['ti'].xcom_push(key='transformed_data', value=df_json)

    #return df




def push_data_to_bigquery(**kwargs):
    # Pull the JSON string from XCom
    df_json = kwargs['ti'].xcom_pull(key='transformed_data')

    if df_json is not None:

        # Convert the JSON string back to a DataFrame
        df = pd.read_json(df_json, orient='split')
        print(df)

    # code to write the data to BigQuery
  

      
        # df = df.where(pd.notnull(df), None)

        # Write the DataFrame to a BigQuery table
        # rows_to_insert = cleaned_df.to_dict(orient='records')
        # client.insert_rows(table, rows_to_insert)
        gbq.to_gbq(df, destination_table=bigquery_table_name,
                project_id=project_id, if_exists='replace')

    else:
        raise ValueError("No data found in XCom for key 'transformed_data'.")
#Create your `PythonOperator` tasks and set the task dependencies:

pull_data_task = PythonOperator(
    task_id='pull_data_from_gsheet',
    python_callable=pull_data_from_gsheet,
    provide_context=True,
    dag=dag,
)

push_data_task = PythonOperator(
    task_id='push_data_to_bigquery',
    python_callable=push_data_to_bigquery,
    provide_context=True,
    dag=dag,
)

push_data_task.set_downstream(pull_data_task)

Note: I am running the code in my local system for now

I am aware about this as well - " XCom is not designed for passing large amounts of data, so if your DataFrame is very large, I might need to consider using an intermediary storage system (e.g., Google Cloud Storage, S3, etc.) to pass data between tasks."

Task 01 is giving me the output when I am running locally airflow tasks test Gsheet_to_bq pull_data_from_gsheet 2023-03-19 Task 01:enter image description here task 02: enter image description here

Specifically, I am trying to push the transformed data from Task 1 to XCom and then pull the transformed data from XCom in Task 2 to write it to a BigQuery table. However, I am facing some issues with the data not flowing properly between the tasks.

I have tried several approaches to resolve this issue, including ensuring that the XCom key and value are properly specified in both tasks, and ensuring that the JSON string conversion is done properly before pushing the data to XCom. However, I am still unable to get the data to flow properly between the tasks.

Could you please provide me with some guidance or advice on how to resolve this issue? I would greatly appreciate any suggestions or solutions you may have.

Thank you very much for your time and assistance.

  • you need to provide the task id in xcom_pull, e.g. `.xcom_pull(task_ids='pull_data_from_gsheet', key='transformed_data')` – mck Mar 19 '23 at 08:18
  • I wanted to express my gratitude for the time you took out of your busy schedule to provide me with your suggestion. I greatly appreciate your willingness to help me with my task. I did try implementing your suggestion by adding the task id, but unfortunately, I am still encountering an error. Despite my efforts, I am receiving the following error message: File "/opt/airflow/dags/df_to_bq_learning.py", line 215, in push_data_to_bigquery raise ValueError("No data found in XCom for key 'transformed_data'.") ValueError: No data found in XCom for key 'transformed_data'. – Rajeev Pandey Mar 19 '23 at 09:40
  • instead of `push_data_task.set_downstream(pull_data_task)`, I think you can use `pull_data_task >> push_data_task`, i.e. you put the two tasks in the other way round – mck Mar 19 '23 at 11:24
  • Once more, I'd like to thank you for responding. Tried everything, but I keep getting the same error. I can see that the Xcom area for task 1 has data in my Airflow user interface, however, the Xcom section for job 2 does not have any information. It would appear that data are not being sent from task 1 to task 2. I even attempted to turn it into a dictionary rather than JSON, but without any success at all. I have no idea what I might possibly be doing wrong. – Rajeev Pandey Mar 19 '23 at 13:36
  • can you try to minimise your problem into a [MRE](https://stackoverflow.com/help/minimal-reproducible-example) and edit your question accordingly? – mck Mar 19 '23 at 14:09
  • As requested, Updated the code based on MRE standards. – Rajeev Pandey Mar 20 '23 at 02:58

0 Answers0