I am trying to write some airflow integration tests, where I imitate the gcs_list_operator by returning a file list from a custom PythonOperator, which is then passed to a PythonBranchOperator through xcomm.
I am struggling to pull the returned values from the previous task from a PythonOperator in the BranchOperator without using a key when running the code locally.
kwargs['ti'].xcom_push(key="dummyKey", value=value)
kwargs['ti'].xcom_pull(key='dummyKey', task_ids='push_to_xcoms')
The code below illustrates what I am trying to do, which is return a list and pull it without declaring a key. In the airflow scheduler it works correctly and prints the list. However, when running in my IDE it isn't pulling/printing. Is anyone able to advise me on this?
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from airflow.models.taskinstance import TaskInstance
DAG = DAG(
dag_id='example_dag',
start_date=datetime(2018, 1, 1),
schedule_interval='@once'
)
def push_function(**kwargs):
ls = ['a', 'b', 'c']
return ls
push_task = PythonOperator(
task_id='push_task',
python_callable=push_function,
provide_context=True,
dag=DAG)
def pull_function(**kwargs):
ti = kwargs['ti']
ls = ti.xcom_pull(task_ids='push_task')
print(ls)
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
provide_context=True,
dag=DAG)
push_task >> pull_task
# push context
push_to_xcoms_ti = TaskInstance(task=push_task, execution_date=datetime.now())
context = push_to_xcoms_ti.get_template_context()
push_task.execute(context)
# branch operator pull from context
check_content_ti = TaskInstance(task=pull_task, execution_date=datetime.now())
context = check_content_ti.get_template_context()
pull_task.execute(context)``