5

I am creating an Airflow @daily DAG, It has an upstream task get_daily_data of BigQueryGetDataOperator which fetches data based on execution_date and on downstream dependent task (PythonOperator) uses above date based data via xcom_pull. When I run the airflow backfill command, the downstream task process_data_from_bq where I am doing xcom_pull, it gets the recent data only not the data of the same execution date which the downstream task is expecting. Airfow documentation is saying if we pass If xcom_pull is passed a single string for task_ids, then the most recent XCom value from that task is returned However its not saying how to get the data of same instance of the DAG execution.

I went through the one same question How to pull xcom value from other task instance in the same DAG run (not the most recent one)? however, the one solution given there is what I am already doing. but seems its not the correct answer.

DAG defination:

dag = DAG(
    'daily_motor',
    default_args=default_args,
    schedule_interval='@daily'
)

#This task creates data in a BigQuery table based on execution date
extract_daily_data  = BigQueryOperator(
    task_id='daily_data_extract',
    use_legacy_sql=False,
    write_disposition='WRITE_TRUNCATE',
    allow_large_results=True,
    sql=policy_by_transaction_date_sql('{{ ds }}'), 
    destination_dataset_table='Test.daily_data_tmp',
    dag=dag)


get_daily_data = BigQueryGetDataOperator(
    task_id='get_daily_data',
    dataset_id='Test',
    table_id='daily_data_tmp',
    max_results='10000',
    dag=dag

)


#This is where I need to pull the data of the same execution date/same instance of DAG run not the most recent task run

def process_bq_data(**kwargs):
    bq_data = kwargs['ti'].xcom_pull(task_ids = 'get_daily_data')
    #This bq_data is most recent one not of the same execution date
    obj_creator = IibListToObject()
    items = obj_creator.create(bq_data, 'daily')
    save_daily_date_wise(items)


process_data = PythonOperator(
    task_id='process_data_from_bq',
    python_callable=process_bq_data,
    provide_context=True,
    dag = dag
)

get_daily_data.set_upstream(extract_daily_data)
process_data.set_upstream(get_daily_data)
Shiv
  • 51
  • 1
  • 2
  • I am wondering about your statement - It has an upstream task get_daily_data of BigQueryGetDataOperator which fetches data based on execution_date. I do not see execution_date or {{ds}} either in the source_code for the operator or in the arguments passed to the operator. Are you sure it fetches data based on execution_date? – nightgaunt Jun 10 '19 at 11:58
  • @nightgaunt yes it passes the {{d}} in the sql parameter in BigQueryOperator. What this operator does is.. fetches the data based on execution date and dumps to a temp table and in the next dependent task this data is extracted. Which is basically BigQueryGetDataOperator – Shiv Jun 21 '19 at 05:07
  • Can you try adding `include_prior_dates=False` to your xcom_pull function? If it does not work can you try `include_prior_dates=True` and share the output? – nightgaunt Jun 21 '19 at 05:22

1 Answers1

2

You must be receiving latest Xcom value. You need to also be sure that values are actually from same execution_date as it is supposed :

:param include_prior_dates: 

If False, only XComs from the current execution_date are returned. If True, XComs from previous dates are returned as well.

Bhargav Rao
  • 50,140
  • 28
  • 121
  • 140
Enrique
  • 91
  • 9
  • since it was causing issue in backfill only and after putting a lot of effort when could not find any concrete solution, I did use depends_on_past flag to True in the DAG definition parameter, which actually made it sequential execution of backfill runs. It slowed down the execution speed but got only 4-5 execution instance having incorrect xcom_pull data in 365 execution instance. Which I managed manually. – Shiv Aug 13 '19 at 17:27