1

I'm trying to receive the HTTP response code back from a triggered Airflow SimpleHttpOperator. I've seen examples using 'lambda' type, and am doing so by looking in the body of the response, but I was hoping to be able to pass the response code off to a function. My current code (which is 90% from example_http_operator):

import json
from datetime import timedelta

from airflow import DAG
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.sensors.http_sensor import HttpSensor
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['me@company.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
}

dag = DAG(dag_id='kick_off_java_task', default_args=default_args)

kickoff_task = SimpleHttpOperator(
     task_id='kick_off_c2c_java_task',
     http_conn_id='test',
     method='GET',
     endpoint='',
     data={ "command": "run" },
     response_check=lambda response: True if "Ok Message" in response.text else False,
     headers={},
     xcom_push=False,
     dag=dag
)

According to the documentation and code, there appears to be a way to have response_check point to a callable, but I am unclear of the syntax, or if I need to head in a completely different direction, such as leveraging xcom.

BPS
  • 607
  • 8
  • 29

2 Answers2

8

After a little trial and error, the solution turns out to be pretty simple:

dag = DAG(dag_id='kick_off_java_task', default_args=default_args)

def check(response):
    if response == 200:
        print("Returning True")
        return True
    else:
        print("Returning False")
        return False

kickoff_task = SimpleHttpOperator(
     task_id='kick_off_c2c_java_task',
     http_conn_id='c2c_test',
     method='GET',
     endpoint='',
     data={ "command": "run" },
     response_check=lambda response: True if check(response.status_code) is True else False,
     headers={},
     xcom_push=False,
     dag=dag
)

having the python function "check" defined before its use in the lambda, I can pass the parameter "response.status_code" to that function.

BPS
  • 607
  • 8
  • 29
0

just add response_check callable to the SimpleHttpoperator.

 SimpleHttpoperator(...,response_check=lambda response: response.status_code == 200)

One Additional point which is worth to note :

  • If the status code is not 200, then airflow marks it as failure job, If it is 200 then it marks as the success.
Ravi
  • 2,778
  • 2
  • 20
  • 32