I am trying to create a Slack alert that posts some basic information into a slack channel when a task in an Airflow DAG is successfully run. My code is as follows:
import datetime
from airflow import models
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.hooks.base_hook import BaseHook
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
SLACK_CONN_ID = "slack_test"
def task_success_slack_alert(context):
"""
Callback task that can be used in DAG to alert of successful task completion
Args:
context (dict): Context variable passed in from Airflow
Returns:
None: Calls the SlackWebhookOperator execute method internally
"""
slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
slack_msg = """
:large_blue_circle: Task completed successfully
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Start Time*: {start}
*End Time*: {end}
*Duration*: {duration} seconds
*Try* {try_number} *of max retries* {max_tries}
*Log Url*: {log_url}
""".format(
task=context.get("task_instance").task_id,
dag=context.get("task_instance").dag_id,
ti=context.get("task_instance"),
exec_date=context.get("execution_date"),
log_url=context.get("task_instance").log_url,
start=context.get("task_instance").start_date,
end=context.get("task_instance").end_date,
duration=context.get("task_instance").duration,
try_number=context.get("task_instance")._try_number,
max_tries=context.get("task_instance").max_tries + 1
)
success_alert = SlackWebhookOperator(
task_id="slack_test",
http_conn_id=SLACK_CONN_ID,
webhook_token=slack_webhook_token,
message=slack_msg,
username="airflow",
)
return success_alert.execute(context=context)
default_dag_args = {'owner': 'TEST',
'start_date': datetime.datetime(2021, 3, 29),
'retries': 1,
'use_legacy_sql': False,
'on_success_callback': task_success_slack_alert
}
with models.DAG('TESTING',
schedule_interval='0 9 * * *',
default_args=default_dag_args) as dag:
I then have a series of BigQueryOperator tasks that write data into big query tables. The slack message is output after every task is run however sometimes the 'End Time' and 'Duration' is 'None', but when i check the logs for the DAG I can see the end time is recorded correctly. Sometimes the Duration/End Time works and sometimes it doesn't, I can't see a logical pattern as to why this is. I have also noticed that sometimes it will post the slack alert twice for the same task, once with the Duration/End Time as 'None' and then again with them populated. Does anyone know why this is happening?