3

Relating to this earlier question, suppose that we have an Apache Airflow DAG that comprises two tasks, first an HTTP request (i.e., SimpleHTTPOperator) and then a PythonOperator that does something with the response of the first task.

Conveniently, using the Dog CEO API as an example, consider the following DAG:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['someone@email.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=1),
}
with DAG(
    'dog_api',
    default_args=default_args,
    description='Get nice dog pics',
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['dog'],
) as dag:
    get_dog = SimpleHttpOperator(
        task_id='get_dog',
        http_conn_id='dog_api', # NOTE: set up an HTTP connection called 'dog_api' with host 'https://dog.ceo/api'
        endpoint='/breeds/image/random',
        method="GET",
        # xcom_push=True # NOTE: no such argument in 2.2.0 but sometimes suggested by older guides online
    )
    
    def xcom_check(ds, **kwargs):
        val = kwargs['ti'].xcom_pull(key='return_value', task_ids='get_dog')
        return f"xcom_check has: {kwargs['ti']} and it says: {val}"
     
    inspect_dog = PythonOperator(
        task_id='inspect_dog',
        python_callable=xcom_check,
        provide_context=True
    )

We'd like to access the return value of get_dog inside xcom_check. By inspecting the logs, get_dog populates the xcom storage nicely to something like:

xcom value

But now, this is not currently passed to the second task. This can be seen by inspecting the logs as well, which says (among other things):

*redacted* Returned value was: xcom_check has: <TaskInstance: dog_api.inspect_dog manual__2021-10-30T16:27:23.081539+00:00 [running]> and it says: None

So obviously, the task instance is "dog_api.inspect_dog" but we'd want it to be "dog_api.get_dog". How is this done? At the time of writing, the same question is asked in the comments of the previous question, upvoted, but unanswered. I also tried adapting this answer but can't figure out what I'm still doing differently.

Juho
  • 976
  • 1
  • 13
  • 27

1 Answers1

4

Your problem is that you did not set dependency between the tasks so inspect_dog may run before or in parallel to get_dog when this happens get_dog will see no xcom value because inspect_dog didn't push it yet.

enter image description here

You just need to set dependency as:

get_dog >> inspect_dog

enter image description here

Log :

[2021-10-31, 07:07:21 UTC] {python.py:174} INFO - Done. Returned value was: xcom_check has: <TaskInstance: dog_api.inspect_dog manual__2021-10-31T07:05:27.721051+00:00 [running]> and it says: {"message":"https:\/\/images.dog.ceo\/breeds\/pointer-germanlonghair\/hans1.jpg","status":"success"}

As for your comment in the code about xcom_push: The xcom_push parameter was used in older Airflow versions. It was replaced by do_xcom_push (see source code). Notice that the default value of this parameter is True.

Elad Kalif
  • 14,110
  • 2
  • 17
  • 49