In airflow, I am writing a DAG with several tasks, all PythnOperators
so far. From t1
I want to store a variable in the xcom
dictionary, and then inside the function for t2
I would like to access the variable without explicitly calling the task name (which would require hardcoding the task name in the t2 function). So my plan was to access the context['ti']
and use the _get_previous_ti()
, which has the attribute task_id
. This looks like what I want, but it is definitely not working for me.
I tried this:
from airflow.models import DAG, Variable
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
def task1(**context):
return 'TASK 1 RESULT'
def task2(**context):
previous_ti = context['ti']._get_previous_ti()
print("Previous TI: ", previous_ti)
previous_ti_id = previous_ti.task_id
print("Previous task_id: ", previous_ti_id)
# use previous_ti_id to access context['ti'].xcom.pull(previous_ti_id)
return
default_args = {
"owner": "me",
"start_date": days_ago(1)
}
dag = DAG(
dag_id='some_test',
default_args=default_args,
schedule_interval=None)
with dag:
t1 = PythonOperator(
task_id = "task_1_testing",
python_callable=task1,
provide_context=True)
t2 = PythonOperator(
task_id = "task_2_testing",
python_callable=task2,
provide_context=True)
t1 >> t2
But this gives strange results: when I first tested it, airflow had already been running and it seemed like it was referencing task instance of a previously triggered dag run (?). When I exited airflow and re-started it with this code, it gave me the explosion screen with the error: AttributeError: 'NoneType' object has no attribute 'dag_id'
.
All I really want here is to make a variable in the t2
function called previous_ti_id
, which in this example would return equal to task_1_testing
. Is it possible?
I found this previous question, but I do not understand enough about airflow to determine if this is relevant (it doesn't look it though). I would appreciate some assistance with this.