I set up a conditional task in airflow described here. All it does is to check if a hive partition exists. If yes, proceed with the rest of the tasks, and if not, add partition first before proceeding. The conditional check task can either fail or succeed, both are okay. However, I have the pager duty email notification set up for the dag because I want to know when the downstream tasks fail. How do I mute failure notification on that specific conditional task so that I don't get a false alarm on pager duty?
Asked
Active
Viewed 3,663 times
1
-
How does this PagerDuty alert get triggered? If it's with `on_failure_callback` or `email_on_failure`, you should be able to just set them to `None` for that particular task. – Daniel Huang Jan 24 '18 at 04:14
-
It uses email_on_failure, as part of default_args. dag=dag is applied to all tasks. If I set it to None, I won't be able to find out other task failures in the same dag. – Stella Jan 24 '18 at 19:24
-
1Yeah so you should be able to keep it set in `default_args` so all your other tasks have the alert, then override it when you define your one task, i.e. `BashOperator(task_id='foo', dag=dag, on_failure_callback=None)`. – Daniel Huang Jan 24 '18 at 20:44
-
It does not work – Stella Feb 01 '18 at 23:07
-
1Sorry so you tried ``BashOperator(task_id='foo', dag=dag, email_on_failure =None)``? Sorry ``on_failure_callback`` was a typo in my last comment. – Daniel Huang Feb 02 '18 at 10:29
1 Answers
3
email_on_failure
and on_failure_callback
etc. are task (operator) level parameters. They inherit from DAG object, value of default_args
you passed to the DAG, but also you can overwrite them on initialization.
YourOperator(task_id='task1', dag=dag, email_on_failure=None, on_failure_callback=None, ...)
here is the source code how airflow handle those callbacks when task fails, make you more clear how it works.
def handle_failure(self, error, test_mode=False, context=None):
self.log.exception(error)
task = self.task
session = settings.Session()
self.end_date = datetime.utcnow()
self.set_duration()
Stats.incr('operator_failures_{}'.format(task.__class__.__name__), 1, 1)
Stats.incr('ti_failures')
if not test_mode:
session.add(Log(State.FAILED, self))
# Log failure duration
session.add(TaskFail(task, self.execution_date, self.start_date, self.end_date))
# Let's go deeper
try:
# Since this function is called only when the TI state is running,
# try_number contains the current try_number (not the next). We
# only mark task instance as FAILED if the next task instance
# try_number exceeds the max_tries.
if task.retries and self.try_number <= self.max_tries:
self.state = State.UP_FOR_RETRY
self.log.info('Marking task as UP_FOR_RETRY')
if task.email_on_retry and task.email:
self.email_alert(error, is_retry=True)
else:
self.state = State.FAILED
if task.retries:
self.log.info('All retries failed; marking task as FAILED')
else:
self.log.info('Marking task as FAILED.')
if task.email_on_failure and task.email:
self.email_alert(error, is_retry=False)
except Exception as e2:
self.log.error('Failed to send email to: %s', task.email)
self.log.exception(e2)
# Handling callbacks pessimistically
try:
if self.state == State.UP_FOR_RETRY and task.on_retry_callback:
task.on_retry_callback(context)
if self.state == State.FAILED and task.on_failure_callback:
task.on_failure_callback(context)
except Exception as e3:
self.log.error("Failed at executing callback")
self.log.exception(e3)
if not test_mode:
session.merge(self)
session.commit()
self.log.error(str(error))
https://airflow.apache.org/_modules/airflow/models.html#BaseOperator

Fan
- 410
- 5
- 10