0

I have a problem to solve:

Whenever someone registers, the web app logs an e-mail request for authentication code in the database and redirects to the next screen, where the user has to enter the six-digit code sent to his/her email.

This is the DAG I envisioned:

enter image description here

My DAG code:

default_args = {
    "owner":"airflow",
    "start_date": datetime(2021,4,2),
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
    "email":"rhonaldmoses@gmail.com",
    "retries":1,
    "retries_delay":timedelta(minutes=5)
}
 
def email_request_exists(response):
    json_response_text = json.loads(response.text)
    
    #by default, email requests do not exists
    requests_exists = False
    
    if json_response_text['status']==200:
        if json_response_text['requests_exists'] > 0:
            requests_exists = True
 
    email_request_status = Variable.set('request_exists',requests_exists)
    return email_request_status
 
def get_request_processor():
    switch = Variable.get('request_exists')
    
    if switch:
        return 'anp_process_email_requests'
    else:
        return 'anp_no_email_to_process'
 
def check_is_request_processed(response):
    print(response.text)
    
    json_response_text = json.loads(response.text)
    print(json_response_text)
    
    if json_response_text['status']==200:
        if json_response_text['request_processed'] > 0:
            return 'anp_check_orphan_emails'
 
    return 'anp_email_request_done'
    
 
with DAG(dag_id="artnpics_auth_email_manager", schedule_interval="* * * * *",
         default_args=default_args, 
         catchup=False) as dag:
    
    anp_is_email_request_available = HttpSensor(
        task_id="anp_is_email_request_available",
        method='GET',
        http_conn_id='artnpics_api_calls',
        endpoint='commsemail/api-email-request-processor/',
        response_check=lambda response: True if email_request_exists(response) is True else False,
        poke_interval=5,
        timeout=20
    )
    
    anp_has_emails_to_process = BranchPythonOperator(
        task_id='anp_has_emails_to_process',
        python_callable=get_request_processor,
        trigger_rule="one_success"
        #trigger_rule="all_success"
    )
    
    #execute this when the email request queue check returned success
    anp_process_email_requests = HttpSensor(
        task_id="anp_process_email_requests",
        method='GET',
        http_conn_id='artnpics_api_calls',
        endpoint='commsemail/api-email-process-requests/',
        response_check=lambda response: True if check_is_request_processed(response) is True else False,
        poke_interval=5,
        timeout=20
    )
    
    #execute this when the email request queue check returned failure
    anp_no_email_to_process = DummyOperator(
        task_id='anp_no_email_to_process',
        #trigger_rule="all_success"
    )
    
    #check records that are ommitted for a very long time and reprioritize them
    anp_check_orphan_emails = DummyOperator(
        task_id='anp_check_orphan_emails',
        trigger_rule="one_success"
    )
    
    anp_email_request_done = DummyOperator(
        task_id='anp_email_request_done',
        trigger_rule="one_success"
    )
    
    anp_is_email_request_available >> anp_has_emails_to_process >> anp_no_email_to_process >> anp_email_request_done
    anp_is_email_request_available >> anp_has_emails_to_process >> anp_process_email_requests >> anp_email_request_done

When I run, the anp_is_email_request_available task gets executed, and then it stops. However, it does not go to the next one (branch operator).

Basically, this is what I want to accomplish:

Execute anp_is_email_request_available and see whether the API returns true/false (if an email request exists with status=1, return True or else False).

If the returned value in anp_is_email_request_available is True, then execute anp_process_email_requests. Otherwise, process anp_email_request_done.

If the retuend value in anp_is_email_request_available is False, then execute anp_email_request_done

API is working fine (I removed all conditional branching and made it sequential, and it worked flawlessly).

Olaf Kock
  • 46,930
  • 8
  • 59
  • 90
Rhonald
  • 363
  • 5
  • 18

1 Answers1

1

Try checking the logs of anp_is_email_request_available for more details but I think there may be a problem with the callable function you are using in the lambda for response_check.

response_check=lambda response: True if email_request_exists(response) is True else False,

email_request_exists is returning None instead of True making response_check to fail. The same happens later on for check_is_request_processed. You can change the return value of those functions or change the ternary operator to evaluate the truthy of that value.

response_check=lambda response: True if email_request_exists(response) else False,

You can find an example of the callable on this answer. Hope that works for you!

NicoE
  • 4,373
  • 3
  • 18
  • 33
  • Unfortunately, this didn't work. the email_request_exists returns True/False, and I do get those values. – Rhonald May 10 '21 at 23:57