In my case, I had issues with the accepted answers:
- Raising
Ignore()
or Reject()
from within the task led to the task being correctly in FAILURE
state, but when running a Celery workflow (think chain
, chord
, or group
) containing this failing task would always result in the workflow hanging:
workflow = chain(my_task.si([], *args, **kwargs), other_task.s(*args, **kwargs))
res = workflow()
results = res.get() # hangs as the workflow never enters the ready state
- I wanted the rest of the workflow to still run even if one of the tasks failed (not propagate exceptions, or have global error handlers that are difficult to work with)
So I ended up always marking the task as success and doing my own error post-processing after the workflow ends (always successfully):
import traceback
def my_task(prev, arg1, arg2, opts={}):
results = []
state = {
'state': task_state,
'meta': {
'custom_attr': 'test',
# task metadata as needed
}
}
try:
# task code goes here
task_state = 'SUCCESS'
task_exc = None
except BaseException as exc:
task_state = 'FAILURE'
task_exc = exc
finally:
state['state'] = 'SUCCESS'
if task_state == 'FAILURE':
exc_str = ' '.join(traceback.format_exception(
etype=type(task_exc),
value=task_exc,
tb=task_exc.__traceback__))
state['meta']['error'] = exc_str
# Update task state with final status
self.update_state(**state)
return results
This has the advantage of:
- Keeping all the needed exception data (traceback)
- Avoiding the weird Celery post-processing of task states:
- when tasks fail Celery replaces the dict data by the exception instance, which prevents having a consistent way of accessing task metadata
- updating the
FAILURE
state without raising Ignore()
or Reject()
always result in task state being SUCCESS
...
- Make complex workflows way more resilient to failures.
This allows me to always process workflow results in the following way:
info = res.get() # this is always a dict containing metadata
error = info.get('error')
results = info['results']
custom_attr = info['custom_attr']