6

I recently started using Apache airflow. In am using Taskflow API with one decorated task with id Get_payload and SimpleHttpOperator. Task Get_payload gets data from database, does some data manipulation and returns a dict as payload.

Probelm

Unable to pass data from previous task into the next task. Yes I am aware of XComs but whole purpose of using Taskflow API is to avoid direct interactions with XComs. Getting below error when get_data is directly passed to data property of SimpleHttpOperator.

airflow.exceptions.AirflowException: 400:BAD REQUEST

What have I tried so far?

As mentioned in this SO answer, I used template_field in my custom sensor to define the field in which to expect the data from the previous task. In case of SimpleHttpOperator operator I cannot edit it to do the same. So how to solve it similarly in SimpleHttpOperator?

I have checkd this SO answer and this as well.

DAG:

from airflow.decorators import dag, task
from airflow.providers.http.operators.http import SimpleHttpOperator

from datetime import datetime

default_args = {
    "owner": "airflow",
    "start_date": datetime(2021, 1, 1),
}


@dag(default_args=default_args, schedule_interval=None, tags=["Http Operators"])
def http_operator():
    @task(multiple_outputs=True)
    def Get_payload(**kwargs):
        # STEP 1: Get data from database.

        # STEP 2: Manipulate data.

        # STEP 3: Return payload.
        data = {
            "key_1": "Value 1",
            "key_2": "Value 2",
            "key_3": "Value 3",
            "key_4": "Value 4",
        }

        return data

    get_data = Get_payload()

    ml_api = SimpleHttpOperator(
        task_id="some_api",
        http_conn_id="http_conn_id",
        method="POST",
        endpoint="/some-path",
        data=get_data,
        headers={"Content-Type": "application/json"},
    )

    get_data >> ml_api


http_operator_dag = http_operator()

Full log:

[2021-08-28 20:28:12,947] {taskinstance.py:903} INFO - Dependencies all met for <TaskInstance: http_operator.clf_api 2021-08-28T20:28:10.265689+00:00 [queued]>
[2021-08-28 20:28:12,970] {taskinstance.py:903} INFO - Dependencies all met for <TaskInstance: http_operator.clf_api 2021-08-28T20:28:10.265689+00:00 [queued]>
[2021-08-28 20:28:12,970] {taskinstance.py:1094} INFO - 
--------------------------------------------------------------------------------
[2021-08-28 20:28:12,971] {taskinstance.py:1095} INFO - Starting attempt 1 of 1
[2021-08-28 20:28:12,971] {taskinstance.py:1096} INFO - 
--------------------------------------------------------------------------------
[2021-08-28 20:28:12,982] {taskinstance.py:1114} INFO - Executing <Task(SimpleHttpOperator): clf_api> on 2021-08-28T20:28:10.265689+00:00
[2021-08-28 20:28:12,987] {standard_task_runner.py:52} INFO - Started process 19229 to run task
[2021-08-28 20:28:12,991] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'http_operator', 'clf_api', '2021-08-28T20:28:10.265689+00:00', '--job-id', '71', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/Http_Operator.py', '--cfg-path', '/tmp/tmp4l9hwi4q', '--error-file', '/tmp/tmpk1yrhtki']
[2021-08-28 20:28:12,993] {standard_task_runner.py:77} INFO - Job 71: Subtask clf_api
[2021-08-28 20:28:13,048] {logging_mixin.py:109} INFO - Running <TaskInstance: http_operator.clf_api 2021-08-28T20:28:10.265689+00:00 [running]> on host d332abee08c8
[2021-08-28 20:28:13,126] {taskinstance.py:1251} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=http_operator
AIRFLOW_CTX_TASK_ID=clf_api
AIRFLOW_CTX_EXECUTION_DATE=2021-08-28T20:28:10.265689+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-08-28T20:28:10.265689+00:00
[2021-08-28 20:28:13,128] {http.py:111} INFO - Calling HTTP method
[2021-08-28 20:28:13,141] {base.py:70} INFO - Using connection to: id: ML_API. Host: <IP-REMOVED>, Port: None, Schema: , Login: dexter, Password: ***, extra: {}
[2021-08-28 20:28:13,144] {http.py:140} INFO - Sending 'POST' to url: http://<IP-REMOVED>/classify
[2021-08-28 20:28:13,841] {http.py:154} ERROR - HTTP error: BAD REQUEST
[2021-08-28 20:28:13,842] {http.py:155} ERROR - <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<title>400 Bad Request</title>
<h1>Bad Request</h1>
<p>Failed to decode JSON object: Expecting value: line 1 column 1 (char 0)</p>

[2021-08-28 20:28:13,874] {taskinstance.py:1462} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/http/hooks/http.py", line 152, in check_response
    response.raise_for_status()
  File "/home/airflow/.local/lib/python3.8/site-packages/requests/models.py", line 953, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: BAD REQUEST for url: http://<IP-REMOVED>/classify

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1164, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1282, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1312, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/http/operators/http.py", line 113, in execute
    response = http.run(self.endpoint, self.data, self.headers, self.extra_options)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/http/hooks/http.py", line 141, in run
    return self.run_and_check(session, prepped_request, extra_options)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/http/hooks/http.py", line 198, in run_and_check
    self.check_response(response)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/http/hooks/http.py", line 156, in check_response
    raise AirflowException(str(response.status_code) + ":" + response.reason)
airflow.exceptions.AirflowException: 400:BAD REQUEST
[2021-08-28 20:28:13,882] {taskinstance.py:1505} INFO - Marking task as FAILED. dag_id=http_operator, task_id=clf_api, execution_date=20210828T202810, start_date=20210828T202812, end_date=20210828T202813
[2021-08-28 20:28:13,969] {local_task_job.py:151} INFO - Task exited with return code 1
[2021-08-28 20:28:14,043] {local_task_job.py:261} INFO - 0 downstream tasks scheduled from follow-on schedule check

Dheemanth Bhat
  • 4,269
  • 2
  • 21
  • 40
  • Can you explain a little more on what you mean by "unable to pass data"? Are you seeing an error, is the value being passed to the `SimpleHttpOperator` not rendering correctly, etc.? – Josh Fell Aug 28 '21 at 19:45
  • I am getting 400 Bad request error in API response, this happens when payload is missing. – Dheemanth Bhat Aug 28 '21 at 19:46
  • 1
    Unfortunately I'm not able to reproduce the issue on Airflow 2.1.3. You're code works as written (posting to httpbin.org at least). I don't see any discernible differences between the responses either using a `SimpleHttpOperator` or `PythonOperator` to execute the POST request. Is it possible to post what the `PythonOperator` version code looks like for a comparison? – Josh Fell Aug 28 '21 at 20:20
  • Thanks. I am using version 2.1.3 as well. I have added the full log of the task if it helps. I have removed actual IP address with for security reasons. – Dheemanth Bhat Aug 28 '21 at 20:33
  • 2
    Thanks! Does this post help at all? https://stackoverflow.com/questions/37523457/got-failed-to-decode-json-object-when-calling-a-post-request-in-flask-python You might have to `json.dumps()` the dict before returning from "Get_payload" task. – Josh Fell Aug 28 '21 at 20:39

1 Answers1

3

As suggested by @Josh Fell in the comments, I had two mistakes in my DAG.

  1. Wrap the data in json.dumps(data) before returning it from Get_payload.
  2. Remove multiple_outputs=True from the task decorator of Get_payload.

Final code:

import json

from airflow.decorators import dag, task
from airflow.providers.http.operators.http import SimpleHttpOperator

from datetime import datetime

default_args = {
    "owner": "airflow",
    "start_date": datetime(2021, 1, 1),
}


@dag(default_args=default_args, schedule_interval=None, tags=["Http Operators"])
def http_operator():
    @task()
    def Get_payload(**kwargs):
        # STEP 1: Get data from database.

        # STEP 2: Manipulate data.

        # STEP 3: Return payload.
        data = {
            "key_1": "Value 1",
            "key_2": "Value 2",
            "key_3": "Value 3",
            "key_4": "Value 4",
        }

        return json.dumps(data)

    get_data = Get_payload()

    ml_api = SimpleHttpOperator(
        task_id="some_api",
        http_conn_id="http_conn_id",
        method="POST",
        endpoint="/some-path",
        data=get_data,
        headers={"Content-Type": "application/json"},
    )

    get_data >> ml_api


http_operator_dag = http_operator()
Dheemanth Bhat
  • 4,269
  • 2
  • 21
  • 40