I have a problem to solve:
Whenever someone registers, the web app logs an e-mail request for authentication code in the database and redirects to the next screen, where the user has to enter the six-digit code sent to his/her email.
This is the DAG I envisioned:
My DAG code:
default_args = {
"owner":"airflow",
"start_date": datetime(2021,4,2),
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"email":"rhonaldmoses@gmail.com",
"retries":1,
"retries_delay":timedelta(minutes=5)
}
def email_request_exists(response):
json_response_text = json.loads(response.text)
#by default, email requests do not exists
requests_exists = False
if json_response_text['status']==200:
if json_response_text['requests_exists'] > 0:
requests_exists = True
email_request_status = Variable.set('request_exists',requests_exists)
return email_request_status
def get_request_processor():
switch = Variable.get('request_exists')
if switch:
return 'anp_process_email_requests'
else:
return 'anp_no_email_to_process'
def check_is_request_processed(response):
print(response.text)
json_response_text = json.loads(response.text)
print(json_response_text)
if json_response_text['status']==200:
if json_response_text['request_processed'] > 0:
return 'anp_check_orphan_emails'
return 'anp_email_request_done'
with DAG(dag_id="artnpics_auth_email_manager", schedule_interval="* * * * *",
default_args=default_args,
catchup=False) as dag:
anp_is_email_request_available = HttpSensor(
task_id="anp_is_email_request_available",
method='GET',
http_conn_id='artnpics_api_calls',
endpoint='commsemail/api-email-request-processor/',
response_check=lambda response: True if email_request_exists(response) is True else False,
poke_interval=5,
timeout=20
)
anp_has_emails_to_process = BranchPythonOperator(
task_id='anp_has_emails_to_process',
python_callable=get_request_processor,
trigger_rule="one_success"
#trigger_rule="all_success"
)
#execute this when the email request queue check returned success
anp_process_email_requests = HttpSensor(
task_id="anp_process_email_requests",
method='GET',
http_conn_id='artnpics_api_calls',
endpoint='commsemail/api-email-process-requests/',
response_check=lambda response: True if check_is_request_processed(response) is True else False,
poke_interval=5,
timeout=20
)
#execute this when the email request queue check returned failure
anp_no_email_to_process = DummyOperator(
task_id='anp_no_email_to_process',
#trigger_rule="all_success"
)
#check records that are ommitted for a very long time and reprioritize them
anp_check_orphan_emails = DummyOperator(
task_id='anp_check_orphan_emails',
trigger_rule="one_success"
)
anp_email_request_done = DummyOperator(
task_id='anp_email_request_done',
trigger_rule="one_success"
)
anp_is_email_request_available >> anp_has_emails_to_process >> anp_no_email_to_process >> anp_email_request_done
anp_is_email_request_available >> anp_has_emails_to_process >> anp_process_email_requests >> anp_email_request_done
When I run, the anp_is_email_request_available task gets executed, and then it stops. However, it does not go to the next one (branch operator).
Basically, this is what I want to accomplish:
Execute anp_is_email_request_available and see whether the API returns true/false (if an email request exists with status=1, return True or else False).
If the returned value in anp_is_email_request_available is True, then execute anp_process_email_requests. Otherwise, process anp_email_request_done.
If the retuend value in anp_is_email_request_available is False, then execute anp_email_request_done
API is working fine (I removed all conditional branching and made it sequential, and it worked flawlessly).