5

I have 2 tasks. In the first, python operator computes something and in the second I want to use the output of the python operator in the Http operator. Here is my code:

source_list = ['account', 'sales']

for source_type in source_list:
    t2 = PythonOperator(
                task_id='compute_next_gather_time_for_' + source_type,
                python_callable=compute_next_gather_time,
                provide_context=True,
                trigger_rule=TriggerRule.ALL_SUCCESS,
                op_args=[source_type],
                retries=3
            )

    t3 = SimpleHttpOperator(
                task_id='request_' + source_type + '_report',
                method='POST',
                http_conn_id='abc',
                endpoint=endpoint,
                data=json.dumps({
                    "query": {
                        "start": "{{ task_instance.xcom_pull(task_ids='prev_task_id') }}",
                        "stop": str(yesterday),
                        "fields": [
                            1
                        ]
                    }
                }),
                headers={"Content-Type": "application/json", "Authorization": 'abc'},
                response_check=lambda response: True if len(response.json()) == 0 else False,
                log_response=True,
                retries=3
            )

Query: I want to pass previous task id in t3 in its data variable. I am not sure how to do that since t2 task id is not constant. It changes with changing source_type. Evidently, when I tried it did not render it.

Gagan
  • 1,775
  • 5
  • 31
  • 59
  • Does this answer your question? [Apache Airflow - get all parent task\_ids](https://stackoverflow.com/questions/54728513/apache-airflow-get-all-parent-task-ids) – y2k-shubham Sep 24 '20 at 13:11
  • @y2k-shubham this solves problem only for base operator, not for python operator – Manaslu Aug 24 '22 at 11:15
  • **@Manaslu** it should. `BaseOperator` is the parent class of all operators -> so any functions exposed by it are naturally inherited by every Airflow operator (that's the crux of `inheritance` in object-oriented programming) – y2k-shubham Aug 24 '22 at 12:13

3 Answers3

4

I was able to get it by doing this:

next(iter(context['task'].upstream_task_ids))
cosbor11
  • 14,709
  • 10
  • 54
  • 69
3

I haven't used Jinja templating in any of my DAGs before, but I have been faced with similar problems where I was needing to retrieve XCOM values from a particular task that has a dynamically generated task_id.

You could define the task_ids in T3 in the same way you defined the task_id in T2. For example:

source_list = ['account', 'sales']

for source_type in source_list:

    task_id='compute_next_gather_time_for_' + source_type

    t2 = PythonOperator(
                task_id=task_id,
                python_callable=compute_next_gather_time,
                provide_context=True,
                trigger_rule=TriggerRule.ALL_SUCCESS,
                op_args=[source_type],
                retries=3
            )

    t3 = SimpleHttpOperator(
                task_id='request_' + source_type + '_report',
                method='POST',
                http_conn_id='abc',
                endpoint=endpoint,
                data=json.dumps({
                    "query": {
                        "start": "{{ task_instance.xcom_pull(task_ids=task_id) }}",
                        "stop": str(yesterday),
                        "fields": [
                            1
                        ]
                    }
                }),
                headers={"Content-Type": "application/json", "Authorization": 'abc'},
                response_check=lambda response: True if len(response.json()) == 0 else False,
                log_response=True,
                retries=3
            )
Josh
  • 1,556
  • 1
  • 10
  • 21
  • 1
    Thanks buddy. It worked. I changed 'data' into this and it worked fine. Instead of creating a variable, i just used it directly in jinja. data=json.dumps({ "query": { "start": "{{ task_instance.xcom_pull(task_ids='" + 'compute_next_gather_time_for_' + source_type + "') }}", "stop": str(yesterday), "fields": [ 1 ] } }), – Gagan Jun 04 '19 at 23:15
  • @Gagan, can you please add this as a new answer, since your answer shows how to implement the advice? – cdabel Apr 22 '20 at 19:05
0

To elaborate a bit on @cosbor11's answer.

The approach uses the Airflow task object extracted from the key-word arguments supplied by Airflow during a DAG run. These were once referred to as context and there was an argument to PythonOperator provide_context, but that is deprecated now, I believe. The context is always provided now, making available task, task-instance and other objects and attributes.

So, we can pull things like upstream_task_ids from the task object. Use an iterator or just access it as a list.

def my_python_callable(**context):
    upstream_id = next(iter(context['task'].upstream_task_ids))
    upstream_ids = context['task'].upstream_task_ids
    print(f"got upstream task_id from the task object in the Airflow-provided context: {upstream_id} from a list: {upstream_ids}")

with DAG('silly_hats') as dag:
    task0 = DummyOperator(task_id = 'my_spoons_too_big')
    task1 = PythonOperator(task_id = 'i_am_a_banana', python_callable = my_python_callable, dag = dag)
Connor Dibble
  • 517
  • 3
  • 13