0

I am trying to create a Slack alert that posts some basic information into a slack channel when a task in an Airflow DAG is successfully run. My code is as follows:

import datetime

from airflow import models
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.hooks.base_hook import BaseHook
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator

SLACK_CONN_ID = "slack_test"


def task_success_slack_alert(context):
    """
    Callback task that can be used in DAG to alert of successful task completion
    Args:
        context (dict): Context variable passed in from Airflow
    Returns:
        None: Calls the SlackWebhookOperator execute method internally
    """

    slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
    slack_msg = """
            :large_blue_circle: Task completed successfully
            *Task*: {task}  
            *Dag*: {dag} 
            *Execution Time*: {exec_date}  
            *Start Time*: {start}
            *End Time*: {end}
            *Duration*: {duration} seconds
            *Try* {try_number} *of max retries* {max_tries}
            *Log Url*: {log_url} 
            """.format(
        task=context.get("task_instance").task_id,
        dag=context.get("task_instance").dag_id,
        ti=context.get("task_instance"),
        exec_date=context.get("execution_date"),
        log_url=context.get("task_instance").log_url,
        start=context.get("task_instance").start_date,
        end=context.get("task_instance").end_date,
        duration=context.get("task_instance").duration,
        try_number=context.get("task_instance")._try_number,
        max_tries=context.get("task_instance").max_tries + 1
        )

    success_alert = SlackWebhookOperator(
        task_id="slack_test",
        http_conn_id=SLACK_CONN_ID,
        webhook_token=slack_webhook_token,
        message=slack_msg,
        username="airflow",
        )

    return success_alert.execute(context=context)

default_dag_args = {'owner': 'TEST',
                    'start_date': datetime.datetime(2021, 3, 29),
                    'retries': 1,
                    'use_legacy_sql': False,
                    'on_success_callback': task_success_slack_alert
                    }

with models.DAG('TESTING',
                schedule_interval='0 9 * * *',
                default_args=default_dag_args) as dag:

I then have a series of BigQueryOperator tasks that write data into big query tables. The slack message is output after every task is run however sometimes the 'End Time' and 'Duration' is 'None', but when i check the logs for the DAG I can see the end time is recorded correctly. Sometimes the Duration/End Time works and sometimes it doesn't, I can't see a logical pattern as to why this is. I have also noticed that sometimes it will post the slack alert twice for the same task, once with the Duration/End Time as 'None' and then again with them populated. Does anyone know why this is happening?

enter image description here

soph020420
  • 43
  • 1
  • 3
  • What version of Airflow are you running? I would also check the worker logs for clues since the on_success_callback will be executed by the worker when the task is successful. – Alan Ma Apr 01 '21 at 23:12
  • @AlanMa I'm using Airflow version 1.10.10+composer – soph020420 Apr 07 '21 at 10:23

1 Answers1

0

There are two points where on_success_callback can be called.

  1. When a task is successful after execution. (taskinstance.py)
  2. When a task state in the metadata database doesn't reflect what the process is currently doing (local_task_job.py)

There are cases where callbacks are executed twice in quick succession due to race conditions between the task instance execution and the local task job. This Github Issue describes a similar scenario as yours.

Without knowing the time when the slack messaging are sent, I can't say for sure it is the same issue. If I had to guess, the slack message with missing end time and duration is from the local task job and the slack message with the metadata filled in is from the task instance.

If you add some caller inspection code in your callback, you can figure out where it's coming from. Please reference this StackOverflow post on getting caller information.


You mentioned that the callback from local_task_job.py has the full set of metadata output. This does makes sense from the code perspective. The end_date is not set before on_success_callback is invoked in taskinstance.py. On the other hand, when the the local task job heartbeats and sees that the state is SUCCESS, it will run the on_success_callback, which at this point in time end_date has been set.

That reminds me, in the past, I ran into this issue as well and used the current time of when the success callback is invoked as the end time.

Alan Ma
  • 501
  • 2
  • 7
  • thank you, I added in some caller inspection code and it looks like on_success_callback is called: 1. **taskinstance.py** by the function **_run_raw_task** : this is the one that runs every time but does not include the end_time or duration 2. **local_task_job.py** by the function **heartbeat_callback** : this only runs sometimes but when it does it includes all of the required information Do you know why this would be? – soph020420 Apr 07 '21 at 10:16
  • Posted an update! I think the interactions are all expected and I also included a workaround. – Alan Ma Apr 08 '21 at 16:12