2

I want to get the exception passed to on_failure_callback in order to check what is the error. For example, if it contains 'there are duplicates' in a certain DAG, the function won't do anything. Otherwise, it will send an email.

However, I am not able to see the format of the exception. I am using Airflow 2.1.2 in Docker and being my dag definition the following:

with DAG(process_name,
         default_args=default_args,
         schedule_interval='@daily',
         max_active_runs=1,
         tags=['import', 'es'],
         on_failure_callback=known_error_dag
         ) as dag:
    operators

The next solutions have been tried:

def known_error_dag(context):
    #  1
    ti = context['ti']
    ti.xcom_push(key='exception', value=context['exception'])

    #  2
    print(context['exception'])
    
    #  3
    logging.info(context['exception'])

I can see the exception neither in the UI nor docker logs. Also, it doesn't appear in XCOM.

The answers to this question make unclear if what I want is possible: Get Exception details on Airflow on_failure_callback context

However, Astronomer course states that it is possible indeed. https://academy.astronomer.io/astronomer-certification-apache-airflow-dag-authoring-preparation

Javier Lopez Tomas
  • 2,072
  • 3
  • 19
  • 41

1 Answers1

4

You can define on_failure_callback on the DAG and on the task level. Exceptions are only passed to the failure callback on the task level, so configure the callback either on your operator, or via default_args on the DAG to all operators:

with DAG(
    process_name,
    default_args=default_args,
    schedule_interval='@daily',
    max_active_runs=1,
    tags=['import', 'es'],
    default_args={
        "on_failure_callback": known_error_dag,
    },
) as dag:

The on_failure_callback defined on the DAG-level will also take context variables, which includes a key "reason", but that only states "task_failure" in case of a failed DAG run, so isn't very useful in most cases.

Bas Harenslak
  • 2,591
  • 14
  • 14
  • I have tried with this and still not working. If I enter into the Task Instance Details menu of the failed task, the section on_failure_callback has indeed my defined function, but I can't see any log/print and the xcom still not there – Javier Lopez Tomas Aug 30 '21 at 13:13