1

Code:

import datetime
import logging

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

def hello_world(ti, execution_date, **context):
    logging.info("Hello World")
    return "Gorgeous"


def addition(ti, **context):
    # Want belows are same each other
    logging.info(context['params']["please1"])
    logging.info(ti.xcom_pull(task_ids="hello_world"))


dag = DAG(
    "test",
    schedule_interval="@hourly",
    start_date=datetime.datetime.now() - datetime.timedelta(days=1),
)

t1 = PythonOperator(
    task_id="hello_world", python_callable=hello_world, dag=dag, provide_context=True
)
t2 = PythonOperator(
    task_id="abc",
    python_callable=addition,
    dag=dag,
    params={"please1": "{{{{ ti.xcom_pull(task_ids='{}') }}}}".format(t1.task_id)},
    provide_context=True,
)

t1 >> t2

I want addition() shows the same result:

    # Want belows are same each other
    logging.info(context['params']["please1"])
    logging.info(ti.xcom_pull(task_ids="hello_world"))

But the result is:

[2021-05-17 23:47:15,286] {test_dag.py:14} INFO - {{ ti.xcom_pull(task_ids='hello_world') }}
[2021-05-17 23:47:15,291] {test_dag.py:15} INFO - Gorgeous

What I want to know: Is it possible to access xcom_pull outside of the task function? e.g. When passing the value from the xcom to PythonOperator?

Thanks!

user3595632
  • 5,380
  • 10
  • 55
  • 111

1 Answers1

3

Jinja-templated args for an operator can only be used for those fields that are listed as template_fields in the operator class. For the PythonOperator that is op_args, op_kwargs, and templates_dict. First, replace your params parameter to op_kwargs and remove the extra curly brackets for Jinja -- only 2 on either side of the expression. Second, and unfortunately, you need to explicitly list the task_id in the ti.xcom_pull(task_ids='<task_id>') call.

Revised code:

import datetime
import logging

from airflow import DAG
from airflow.operators.python_operator import PythonOperator


def hello_world(ti, execution_date, **context):
    logging.info("Hello World")
    return "Gorgeous"


def addition(ti, **context):
    logging.info(context["please1"])
    logging.info(ti.xcom_pull(task_ids="hello_world"))


dag = DAG(
    "test",
    schedule_interval=None,
    start_date=datetime.datetime(2021, 5, 17),
    catchup=False,
)

with dag:
    t1 = PythonOperator(
        task_id="hello_world",
        python_callable=hello_world,
        provide_context=True,
    )

    t2 = PythonOperator(
        task_id="abc",
        python_callable=addition,
        op_kwargs={
            "please1": "{{ ti.xcom_pull(task_ids='hello_world') }}",
        },
        provide_context=True,
    )

    t1 >> t2

Logging from "t2": enter image description here

If you are using Airflow 2.0, the code can actually be simplified to use the new XComArg feaure. This feature allows you to access the output of tasks using a simple task.output expression.

Revised code with 2.0 and XComArg use to access the output of "t1" as the "please1" arg:

import datetime
import logging

from airflow import DAG
from airflow.operators.python_operator import PythonOperator


def hello_world(ti, execution_date, **context):
    logging.info("Hello World")
    return "Gorgeous"


def addition(ti, **context):
    logging.info(context["please1"])
    logging.info(ti.xcom_pull(task_ids="hello_world"))


dag = DAG(
    "test",
    schedule_interval=None,
    start_date=datetime.datetime(2021, 5, 17),
    catchup=False,
)

with dag:
    t1 = PythonOperator(
        task_id="hello_world",
        python_callable=hello_world,
    )

    t2 = PythonOperator(
        task_id="abc",
        python_callable=addition,
        op_kwargs={"please1": t1.output},
    )

    t1 >> t2

More about DAG authoring with 2.0 here.

Josh Fell
  • 2,959
  • 1
  • 4
  • 15
  • 1
    Just adding that since version 2.0.0 `provide_context` is deprecated, so there is no need to use it. There is a warning inside the `PythonOperator` code. – NicoE May 18 '21 at 12:09
  • @NicoE Nice catch. Edited the code sample accordingly. – Josh Fell May 18 '21 at 13:59
  • @JoshFell One more question: it looks like someone use Jinja in other params except the `template_fields` you mentioned: https://stackoverflow.com/a/56335683/3595632 . In this case, how can it possible to use Jinja in `data` params? – user3595632 May 18 '21 at 21:44
  • @user3595632 For the `SimpleHttpOperator` in that example, the `data` parameter is a template field so Jinja templating is completely fine to use. I suspect you might be wondering how it's used for the `start` key in the `data` dict. It's because the entire `data` argument can be templated. Even though the entire `data` argument is not wholly within a Jinja expression, any part of that argument can be. Is this what you are asking? – Josh Fell May 18 '21 at 23:20