0

I am trying to write some airflow integration tests, where I imitate the gcs_list_operator by returning a file list from a custom PythonOperator, which is then passed to a PythonBranchOperator through xcomm.

I am struggling to pull the returned values from the previous task from a PythonOperator in the BranchOperator without using a key when running the code locally.

kwargs['ti'].xcom_push(key="dummyKey", value=value)

kwargs['ti'].xcom_pull(key='dummyKey', task_ids='push_to_xcoms')

The code below illustrates what I am trying to do, which is return a list and pull it without declaring a key. In the airflow scheduler it works correctly and prints the list. However, when running in my IDE it isn't pulling/printing. Is anyone able to advise me on this?

from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from airflow.models.taskinstance import TaskInstance

DAG = DAG(
  dag_id='example_dag',
  start_date=datetime(2018, 1, 1),
  schedule_interval='@once'
)

def push_function(**kwargs):
    ls = ['a', 'b', 'c']
    return ls

push_task = PythonOperator(
    task_id='push_task', 
    python_callable=push_function,
    provide_context=True,
    dag=DAG)

def pull_function(**kwargs):
    ti = kwargs['ti']
    ls = ti.xcom_pull(task_ids='push_task')
    print(ls)

pull_task = PythonOperator(
    task_id='pull_task', 
    python_callable=pull_function,
    provide_context=True,
    dag=DAG)

push_task >> pull_task

# push context
push_to_xcoms_ti = TaskInstance(task=push_task, execution_date=datetime.now())
context = push_to_xcoms_ti.get_template_context()
push_task.execute(context)

# branch operator pull from context
check_content_ti = TaskInstance(task=pull_task, execution_date=datetime.now())
context = check_content_ti.get_template_context()
pull_task.execute(context)``
y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
jmarlow
  • 176
  • 2
  • 6
  • 1
    Could you please elaborate what do you mean by `..running in my IDE..`? Do understand that `XCOM`s can be pulled only when DAG is actually triggered by Airflow since it requires that runtime `context` dictionary (which you are using in the form of `kwargs`) which becomes available only when `task`s are actually executed (not when DAG file is parsed by Airflow). Read [this](https://stackoverflow.com/q/54745555/3679900) – y2k-shubham Aug 14 '20 at 16:50
  • I was working from [this](https://medium.com/@chandukavar/testing-in-airflow-part-2-integration-tests-and-end-to-end-pipeline-tests-af0555cd1a82) blog post. Where I was using TaskInstance.execute() to create the tasks [(combination of a DAG, a task, and a point in time)](https://airflow.readthedocs.io/en/stable/concepts.html). I can get xcom to communicate at runtime when pushing variables to xcom with a particular key (in my IDE). However, it is when I try to return something directly from the `push_function`. So I think I am trying to understand what the difference is here? – jmarlow Aug 19 '20 at 08:50

0 Answers0