1

TLDR

In the python callable for a simpleHttpOperator response function, I am trying to push an xcom that has combined information from two sources to a specificied key (a hash of the filename/path and an object lookup from a DB)

Longer Tale

I have a filesensor written which grabs all new files and passes them to MultiDagRun to parallel process the information (scientific) in the files as xcom. Works great. The simpleHttpOperator POSTs filepath info to a submission api and receives back a task_id which it must then read as a response from another (slow running) api to get the result. This I all have working fine. Files get scanned, it launches multiple dags to process, and returns objects.

But... I cannot puzzle out how to push the result to an xcom inside the python response function for the simpleHttpOperator.

My google- and SO and Reddit-fu has failed me here (and it seems overkill to use the pythonOperator tho that's my next stop.). I notice a lot of people asking similar questions though.

How do you use context or ti or task_instance or context['task_instance'] with the response function? (I cannot use "Returned Value" xcom as I need to distinguish the xcom keys as parallel processing afaik). As the default I have context set to true in the default_args.

Sure I am missing something simple here, but stumped as to what it is (note, I did try the **kwargs and ti = kwargs['ti'] below as well before hitting SO...


def _handler_object_result(response, file):
    # Note: related to api I am calling not Airflow internal task ids
    header_result = response.json()
    task_id = header_result["task"]["id"]

    api = "https://redacted.com/api/task/result/{task_id}".format(task_id=task_id)
    resp = requests.get(api, verify=False).json()
    data = json.loads(resp["data"])
    file_object = json.dumps(data["OBJECT"])
    file_hash = hash(file)
    # This is the part that is not working as I am unsure how
    # to access the task instance to do the xcom_push
    ti.xcom_push(key=file_hash, value=file_object)
    if ti.xcom_pull(key=file_hash):
        return True
    else:
        return False

and the Operator:

    object_result = SimpleHttpOperator(
        task_id="object_result",
        method='POST',
        data=json.dumps({"file": "{{ dag_run.conf['file'] }}", "keyword": "object"}),
        http_conn_id="coma_api",
        endpoint="/api/v1/file/describe",
        headers={"Content-Type": "application/json"},
        extra_options={"verify":False},
        response_check=lambda response: _handler_object_result(response, "{{ dag_run.conf['file'] }}"),
        do_xcom_push=False,
        dag=dag,
    )

I was really expecting the task_instance object to be available in some fashion, either be default or configuration but each variation that has worked elsewhere (filesensor, pythonOperator, etc) hasn't worked, and been unable to google a solution for the magic words to make it accessible.

Daryl
  • 23
  • 4

2 Answers2

0

You can try using the get_current_context() function in your response_check function:

from airflow.operators.python import get_current_context

def _handler_object_result(response, file):
    # Note: related to api I am calling not Airflow internal task ids
    header_result = response.json()
    task_id = header_result["task"]["id"]

    api = "https://redacted.com/api/task/result/{task_id}".format(task_id=task_id)
    resp = requests.get(api, verify=False).json()
    data = json.loads(resp["data"])
    file_object = json.dumps(data["OBJECT"])
    file_hash = hash(file)

    ti = get_current_context()["ti"]  # <- Try this
    ti.xcom_push(key=file_hash, value=file_object)
    if ti.xcom_pull(key=file_hash):
        return True
    else:
        return False

That function is a nice way of still accessing the task's execution context when context isn't explicitly handy or you don't want to pass context attrs around to access it deep in your logic stack.

Josh Fell
  • 2,959
  • 1
  • 4
  • 15
  • 1
    This worked fantastically. Thank you @JoshFell !I have *never* run across the get_current_context() mentioned anywhere in any of the googled posts or even tutorials I've seen (not sure if that's because a lot of stuff changed in 2.x. II was dead easy to use the way I wanted. Saved me a bundle of thrash time. Thanks!! – Daryl Feb 15 '23 at 10:20
0

If you use the handler function directly instead of putting it in a lambda, it should work with the usual **context trick (or **kwargs as you mentioned):

e.g. use this

response_check=_handler_object_result,

with

def _handler_object_result(response, **context):
    ti = context["ti"]
    file = context["dag_run"].conf["file"]
    ### rest of the code

Essentially, your lambda function does not consider the context kwargs, so even if you add the **kwargs/**context to your handler function, it won't be able to see the kwargs/context.


Note that the current way you're templating in the response_check kwarg wouldn't work because response_check is not a templated_field (see this), which only consists of ('endpoint', 'data', 'headers'). That's why I've put the file variable inside the function and I took it from the task context.

mck
  • 40,932
  • 13
  • 35
  • 50