3

In airflow, I am writing a DAG with several tasks, all PythnOperators so far. From t1 I want to store a variable in the xcom dictionary, and then inside the function for t2 I would like to access the variable without explicitly calling the task name (which would require hardcoding the task name in the t2 function). So my plan was to access the context['ti'] and use the _get_previous_ti(), which has the attribute task_id. This looks like what I want, but it is definitely not working for me.

I tried this:

from airflow.models import DAG, Variable
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator

def task1(**context):
    return 'TASK 1 RESULT'

def task2(**context):
    previous_ti = context['ti']._get_previous_ti()
    print("Previous TI: ", previous_ti)
    previous_ti_id = previous_ti.task_id
    print("Previous task_id: ", previous_ti_id)

    # use previous_ti_id to access context['ti'].xcom.pull(previous_ti_id)
    return

default_args = {
    "owner": "me",
    "start_date": days_ago(1)
}

dag = DAG(
    dag_id='some_test',
    default_args=default_args,
    schedule_interval=None)


with dag:

    t1 = PythonOperator(
        task_id = "task_1_testing",
        python_callable=task1,
        provide_context=True)

    t2 = PythonOperator(
        task_id = "task_2_testing",
        python_callable=task2,
        provide_context=True)
    
    t1 >> t2

But this gives strange results: when I first tested it, airflow had already been running and it seemed like it was referencing task instance of a previously triggered dag run (?). When I exited airflow and re-started it with this code, it gave me the explosion screen with the error: AttributeError: 'NoneType' object has no attribute 'dag_id'.

All I really want here is to make a variable in the t2 function called previous_ti_id, which in this example would return equal to task_1_testing. Is it possible?

I found this previous question, but I do not understand enough about airflow to determine if this is relevant (it doesn't look it though). I would appreciate some assistance with this.

fffrost
  • 1,659
  • 1
  • 21
  • 36
  • @y2k-shubham Pretty much, but as my comment to the posted answer in this thread, I'd like to understand how to just extract the string itself. – fffrost Sep 24 '20 at 13:20
  • The following works: `parent_task_ids = list(context['task'].upstream_task_ids)`, and then I get what I want with: `previous_task_name = context['ti'].xcom_pull(parent_task_ids[0])`. But I wonder what happens when we have >1 previous task, and whether this is a correct approach. – fffrost Sep 24 '20 at 14:02
  • 1
    yes precisely that's why it's PLURAL `parent_task_ids`. Which *parent* `task`'s id would you be picking in that case would depend on your application logic [be sure to have this factored in somewhere or else you'll be in for surprises / unexpected behaviour the day you run into multi-parent case] – y2k-shubham Sep 24 '20 at 14:04

1 Answers1

2

The function _get_previous_ti() returns the previous task instance, which is the same task, but from the previous task run. You are looking for the upstream task ids and it should be possible to get these via upstream_list or upstream_list_task_ids

Maybe also this post helps you.

Philipp Johannis
  • 2,718
  • 1
  • 15
  • 18
  • Thank you very much for this. Just one question - what is the best way to extract this as a string? Using `context['task'].upstream_list[0]` returns ``, I just want to extract the `'task_1_testing'` from this, and I'm not sure exactly what is going on in the code `parent_task_ids: List[str] = my_task.upstream_task_ids` or if it's really necessary to do it this way? – fffrost Sep 24 '20 at 13:19