Relating to this earlier question, suppose that we have an Apache Airflow DAG that comprises two tasks, first an HTTP request (i.e., SimpleHTTPOperator) and then a PythonOperator that does something with the response of the first task.
Conveniently, using the Dog CEO API as an example, consider the following DAG:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['someone@email.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1),
}
with DAG(
'dog_api',
default_args=default_args,
description='Get nice dog pics',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['dog'],
) as dag:
get_dog = SimpleHttpOperator(
task_id='get_dog',
http_conn_id='dog_api', # NOTE: set up an HTTP connection called 'dog_api' with host 'https://dog.ceo/api'
endpoint='/breeds/image/random',
method="GET",
# xcom_push=True # NOTE: no such argument in 2.2.0 but sometimes suggested by older guides online
)
def xcom_check(ds, **kwargs):
val = kwargs['ti'].xcom_pull(key='return_value', task_ids='get_dog')
return f"xcom_check has: {kwargs['ti']} and it says: {val}"
inspect_dog = PythonOperator(
task_id='inspect_dog',
python_callable=xcom_check,
provide_context=True
)
We'd like to access the return value of get_dog
inside xcom_check
. By inspecting the logs, get_dog
populates the xcom storage nicely to something like:
But now, this is not currently passed to the second task. This can be seen by inspecting the logs as well, which says (among other things):
*redacted* Returned value was: xcom_check has: <TaskInstance: dog_api.inspect_dog manual__2021-10-30T16:27:23.081539+00:00 [running]> and it says: None
So obviously, the task instance is "dog_api.inspect_dog" but we'd want it to be "dog_api.get_dog". How is this done? At the time of writing, the same question is asked in the comments of the previous question, upvoted, but unanswered. I also tried adapting this answer but can't figure out what I'm still doing differently.