1

I want to customize my DAG to call a datarbicks notebook when it is success or failure. I have created two different functions to call a databricks notebook based on the success/failure cases. success or failure callback function is calling but databricsks notebook is not executing. here is the sample code.

def task_success_callback(context):
    """ task_success callback """
    context['task_instance'].task_id
    print("success case")
    dq_notebook_success_task_params = {
        'existing_cluster_id': Variable.get("DATABRICKS_CLUSTER_ID"),
        'notebook_task': {
            'notebook_path': '/AAA/Airflow/Operators/audit_file_operator',
             'base_parameters': {
                "root": "dbfs:/mnt/aaa",
                "audit_file_path": "/success_file_path/",
                "table_name": "sample_data_table",
                "audit_flag": "success"
            }
        }
    }

    DatabricksSubmitRunOperator(
    task_id="weather_table_task_id",
    databricks_conn_id='databricks_conn',
    json=dq_notebook_success_task_params,
    do_xcom_push=True,
    secrets=[secret.Secret(
    deploy_type='env',
    deploy_target=None,
    secret='adf-service-principal'
    ), secret.Secret(
    deploy_type='env',
    deploy_target=None,
    secret='postgres-credentials',
    )],
    )

def task_failure_callback(context):
    """ task_success callback """
    context['task_instance'].task_id
    print("failure case")
    dq_notebook_failure_task_params = {
        'existing_cluster_id': Variable.get("DATABRICKS_CLUSTER_ID"),
        'notebook_task': {
            'notebook_path': '/AAA/Airflow/Operators/audit_file_operator',
            'base_parameters': {
                "root": "dbfs:/mnt/aaa",
                "audit_file_path": "/failure_file_path/",
                "table_name": "sample_data_table",
                "audit_flag": "failure"
            }
        }
    }

    DatabricksSubmitRunOperator(
    task_id="weather_table_task_id",
    databricks_conn_id='databricks_conn',
    json=dq_notebook_failure_task_params,
    do_xcom_push=True,
    secrets=[secret.Secret(
    deploy_type='env',
    deploy_target=None,
    secret='adf-service-principal'
    ), secret.Secret(
    deploy_type='env',
    deploy_target=None,
    secret='postgres-credentials',
    )],
    )

DEFAULT_ARGS = {
    "owner": "admin",
    "depends_on_past": False,
    "start_date": datetime(2020, 9, 23),
    "on_success_callback": task_success_callback,
    "on_failure_callback": task_failure_callback,
    "email": ["airflow@airflow.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(seconds=10),
}

==================
Remaining DAG code
==================
Kiran
  • 141
  • 2
  • 2
  • 9

2 Answers2

4

In Airflow every operator has execute() method that define the operator logic. When You create your workflow Airflow initialize the constructor, render the templates & call the execute method for you. However when you define operator inside a python function you need also to handle this on your own.

So when you write:

def task_success_callback(context):
   DatabricksSubmitRunOperator(..)

All you did here is to initialize the DatabricksSubmitRunOperator contactor. You didn't invoke the operator logic.

What you need to do is:

def task_success_callback(context):
   op = DatabricksSubmitRunOperator(..)
   op.execute(context)

Noting that invoking operators in this manner is not advised. callbacks have no retry mechanism and should be kept simple. Invoking such operator in callback often means design problem. I would suggest to explore the option of custom operator that handles this logic thus avoiding the need to use DatabricksSubmitRunOperator in the success callback.

Elad Kalif
  • 14,110
  • 2
  • 17
  • 49
  • Thanks Elad for your response and its worked for me. – Kiran Jan 13 '21 at 14:29
  • @Kiran if solved please accept the answer :) – Elad Kalif Jan 13 '21 at 15:05
  • Elad,any idea how to pass the parameters to task_success_callback(context) function ? – Kiran Jan 19 '21 at 09:56
  • @Kiran see https://stackoverflow.com/questions/51851535/pass-other-arguments-to-on-failure-callback – Elad Kalif Jan 19 '21 at 10:10
  • Elad, Thanks for your response but it is only for static parameters but I need to pass those args as dynamic parameters. Can you please help me how we can pass dynamic parameters. – Kiran Jan 20 '21 at 01:02
  • By dynamic you mean what? Where are the value taken from? You can always push the value to xcom and pull it in the callback function. – Elad Kalif Jan 20 '21 at 05:52
  • suppose if we need to load four tables data from source to destination using DAG and for that we have to create four independent tasks in single DAG right. for four different tasks we need to call callback function for four times(either success or failure) for that I need to pass parameters and loop the call back function four times. – Kiran Jan 21 '21 at 15:12
  • @Kiran can you explain what this parameter is? table name? number of rows? what is the point in your ETL that you know the value of this parameter. – Elad Kalif Jan 21 '21 at 15:24
  • here is sample code to following up to above code – Kiran Jan 22 '21 at 07:13
  • 'function' object has no attribute 'update_relative' – Helen Kapatsa Jul 19 '23 at 20:23
  • 1
    @HelenKapatsa I'm not sure if the error you mention is related to the question. There is no dependency between operators in the code snippet thus the update relative doesn't make much sense. This may be different problem in your code. I suggest to open a new question with your code example – Elad Kalif Jul 20 '23 at 08:20
0
TableList = collections.namedtuple(
    "table_list",
    "table_name audit_file_name",
)
LIST_OF_TABLES = [
    TableList(
        table_name="table1",
        audit_file_name="/testdata/Audit_files/",
    ),
    TableList(
        table_name="table2",
        audit_file_name="/testdata/Audit_files/",
    ),
    TableList(
        table_name="table3",
        audit_file_name="/testdata/Audit_files/",
    ),
    TableList(
        table_name="table4",
        audit_file_name="/testdata/Audit_files/",
    )
]
for table in LIST_OF_TABLES:
    DEFAULT_ARGS = {
        "owner": "admin",
        "depends_on_past": False,
        "start_date": datetime(2020, 9, 23),
        "on_success_callback": partial(task_success_callback,table.table_name,table.audit_file_name),
        "on_failure_callback": partial(task_failure_callback,table.table_name,table.audit_file_name),
        "email": ["airflow@airflow.com"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(seconds=10),
    }
    WORKFLOW = DAG(
        'test_dag',
        default_args=DEFAULT_ARGS,
        schedule_interval="30 3 * * 1",
        catchup=False,
    )
Kiran
  • 141
  • 2
  • 2
  • 9