I am creating a dag file, with multiple SimpleHttpOperator request. I need to skipped the next task if previous task returned a failed status. Only continue with success status.
Tried with BranchPythonOperator, which inside i will decide which task to run next. But seem it is not working.
sample of request_info will return
{
"data":{
"name":"Allan",
"age":"26",
"gender":"male",
"country":"California"
},
"status":"failed"
}
request_info = SimpleHttpOperator(
task_id='get_info',
endpoint='get/information',
http_conn_id='localhost',
data=({"guest":"1"})
headers={"Content-Type":"application/json"},
xcom_push=True,
dag=dag
)
update_info = SimpleHttpOperator(
task_id='update_info',
endpoint='update/information',
http_conn_id='localhost',
data=("{{ti.xcom_pull(task_ids='request_info')}}")
headers={"Content-Type":"application/json"},
xcom_push=True,
dag=dag
)
skipped_task = DummyOperator(
task_id='skipped',
dag=dag
)
skip_task = BranchPythonOperator(
task_id='skip_task',
python_callable=next_task,
dag=dag
)
def next_task(**kwangs):
status="ti.xcom_pull(task_ids='request_info')"
if status == "success":
return "update_info"
else:
return "skipped_task"
return "skipped_task"
request_info.set_downstream(skip_task)
#need set down stream base on ststus
I expect the flow should be, after getting the info. Identify status, if success, proceed update else proceed skipped.