I'm trying to rendered correctly data
inside a SimpleHttpOperator
in Airflow with configuration that I send via dag_run
result = SimpleHttpOperator(
task_id="schema_detector",
http_conn_id='schema_detector',
endpoint='api/schema/infer',
method='PUT',
data=json.dumps({
'url': '{{ dag_run.conf["url"] }}',
'fileType': '{{ dag_run.conf["fileType"] }}',
}),
response_check=lambda response: response.ok,
response_filter=lambda response: response.json())
Issue is that the rendered data appears to be like this
{"url": "{{ dag_run.conf[\"url\"] }}", "fileType": "{{ dag_run.conf[\"fileType\"] }}"}
I'm not sure what I'm doing wrong here.
Edit Full code below
default_args = {
'owner': 'airflow',
'start_date': days_ago(0),
}
def print_result(**kwargs):
ti = kwargs['ti']
pulled_value_1 = ti.xcom_pull(task_ids='schema_detector')
pprint.pprint(pulled_value_1)
with DAG(
dag_id='airflow_http_operator',
default_args=default_args,
catchup=False,
schedule_interval="@once",
tags=['http']
) as dag:
result = SimpleHttpOperator(
task_id="schema_detector",
http_conn_id='schema_detector',
endpoint='api/schema/infer',
method='PUT',
headers={"Content-Type": "application/json"},
data=json.dumps({
'url': '{{ dag_run.conf["url"] }}',
'fileType': '{{ dag_run.conf["fileType"] }}',
}),
response_check=lambda response: response.ok,
response_filter=lambda response: response.json())
pull = PythonOperator(
task_id='print_result',
python_callable=print_result,
)
result >> pull