26

I have a check_orders task that's executed periodically. It makes a group of tasks so that I can time how long executing the tasks took, and perform something when they're all done (this is the purpose of res.join [1] and grouped_subs) The tasks that are grouped are pairs of chained tasks.

What I want is for when the first task doesn't meet a condition (fails) don't execute the second task in the chain. I can't figure this out for the life of me and I feel this is pretty basic functionality for a job queue manager. When I try the things I have commented out after [2] (raising exceptions, removing callbacks)... we get stuck on the join() in check_orders for some reason (it breaks the group). I've tried setting ignore_result to False as well for all these tasks but it still doesn't work.

@task(ignore_result=True)
def check_orders():
    # check all the orders and send out appropriate notifications
    grouped_subs = []

    for thingy in things:
       ...

        grouped_subs.append(chain(is_room_open.subtask((args_sub_1, )), 
                        notify.subtask((args_sub_2, ), immutable=True)))

    res = group(grouped_subs).apply_async()

    res.join()         #[1]
    logger.info('Done checking orders at %s' % current_task.request.id))

@task(ignore_result=True)
def is_room_open(args_sub_1):
    #something time consuming
    if http_req_and_parse(args_sub_1):
        # go on and do the notify task
        return True
    else:
        # [2]
        # STOP THE CHAIN SOMEHOW! Don't execute the rest of the chain, how?
        # None of the following things work:
        # is_room_open.update_state(state='FAILURE')
        # raise celery.exceptions.Ignore()
        # raise Exception('spam', 'eggs')
        # current_task.request.callbacks[:] = []

@task(ignore_result=True)
def notify(args_sub_2):
    # something else time consuming, only do this if the first part of the chain 
    # passed a test (the chained tasks before this were 'successful'
    notify_user(args_sub_2)
Salami
  • 2,849
  • 4
  • 24
  • 33
  • 1
    For Celery 4.0 you can use this answer - https://stackoverflow.com/a/40579984/7355106 – rappongy Jul 16 '20 at 06:49
  • Does this answer your question? [Celery: clean way of revoking the entire chain from within a task](https://stackoverflow.com/questions/23793928/celery-clean-way-of-revoking-the-entire-chain-from-within-a-task) – rappongy Jul 16 '20 at 06:52

3 Answers3

21

In my opinion this is a common use-case that doesn't get enough love in the documentation.

Assuming you want to abort a chain mid-way while still reporting SUCCESS as status of the completed tasks, and not sending any error log or whatnot (else you can just raise an exception) then a way to accomplish this is:

@app.task(bind=True)  # Note that we need bind=True for self to work
def task1(self, other_args):
    #do_stuff
    if end_chain:
        self.request.callbacks = None
        return
    #Other stuff to do if end_chain is False

So in your example:

@app.task(ignore_result=True, bind=True)
def is_room_open(self, args_sub_1):
    #something time consuming
    if http_req_and_parse(args_sub_1):
        # go on and do the notify task
        return True
    else:
        self.request.callbacks = None

Will work. Note that instead of ignore_result=True and subtask() you can use the shortcut .si() as stated by @abbasov-alexander

Edited to work with EAGER mode, as suggested by @PhilipGarnero in the comments.

AntonioMO
  • 898
  • 1
  • 6
  • 16
  • 2
    If you are running tasks in EAGER mode, the above will stop the task. I replaced `self.request.callbacks[:] = []` by `self.request.callbacks = None` and it's now working in both cases. – PhilipGarnero May 07 '15 at 17:03
  • If it works in both cases, lets suggest that then. Thanks for commenting to improve the answer :) – AntonioMO May 08 '15 at 07:56
  • 10
    Apparently It doesn't work anymore for Celery 4.0, but `self.request.chain = None ` does. http://stackoverflow.com/questions/23793928/celery-clean-way-of-revoking-the-entire-chain-from-within-a-task/40579984#40579984 – Bruno Henrique Nov 15 '16 at 13:42
  • 1
    Both self.request.callbacks = None and self.request.chain = None did not work for me. I worked around the issue by returning a boolean indicating whether it should be aborted, and accepted that as argument in the follow-up tasks of the chain. – Andreas Profous Oct 08 '20 at 13:46
10

It's unbelievable as a so common case isn't treated in any official documentation. I had to cope with the same issue (but using shared_tasks with bind option, so we have visibility of self object), so I wrote a custom decorator that handles automatically the revocation:

def revoke_chain_authority(a_shared_task):
    """
    @see: https://gist.github.com/bloudermilk/2173940
    @param a_shared_task: a @shared_task(bind=True) celery function.
    @return:
    """
    @wraps(a_shared_task)
    def inner(self, *args, **kwargs):
        try:
            return a_shared_task(self, *args, **kwargs)
        except RevokeChainRequested, e:
            # Drop subsequent tasks in chain (if not EAGER mode)
            if self.request.callbacks:
                self.request.callbacks[:] = []
            return e.return_value

    return inner

You can use it as follows:

@shared_task(bind=True)
@revoke_chain_authority
def apply_fetching_decision(self, latitude, longitude):
    #...

    if condition:
        raise RevokeChainRequested(False)

See the full explanation here. Hope it helps!

Community
  • 1
  • 1
Alex Gidan
  • 2,619
  • 17
  • 29
  • It seems that now the `callbacks` variable is a tuple, so it returns an error when trying to perform that operation: `self.request.callbacks[:] = [] ''' line break ''' TypeError: 'tuple' object does not support item assignment` – mccc May 23 '16 at 13:41
3

Firstly, it seems if into the function exists exception ignore_result don't help you.

Secondly, you use immutable=True It means that next function (in our case is notify) does not take additional arguments. You should use notify.subtask((args_sub_2, ), immutable=False) of course if it suitable for your decision.

Third, you can use shortcuts:

notify.si(args_sub_2) instead notify.subtask((args_sub_2, ), immutable=True)

and

is_room_open.s(args_sub_1) instead is_room_open.subtask((args_sub_1, ))

Try use it code:

@task
def check_orders():
    # check all the orders and send out appropriate notifications
    grouped_subs = []

    for thingy in things:
       ...

        grouped_subs.append(chain(is_room_open.s(args_sub_1), 
                                  notify.s(args_sub_2)))

    res = group(grouped_subs).apply_async()

    res.join()         #[1]
    logger.info('Done checking orders at %s' % current_task.request.id))

@task
def is_room_open(args_sub_1):
    #something time consuming
    if http_req_and_parse(args_sub_1):
        # go on and do the notify task
        return True
    else:
        # [2]
        # STOP THE CHAIN SOMEHOW! Don't execute the rest of the chain, how?
        # None of the following things work:
        # is_room_open.update_state(state='FAILURE')
        # raise celery.exceptions.Ignore()
        # raise Exception('spam', 'eggs')
        # current_task.request.callbacks[:] = []
        return False

@task
def notify(result, args_sub_2):
    if result:
        # something else time consuming, only do this if the first part of the chain 
        # passed a test (the chained tasks before this were 'successful'
        notify_user(args_sub_2)
        return True
    return False

If you want catch exceptions you must use callback as so

is_room_open.s(args_sub_1, link_error=log_error.s())

from proj.celery import celery

@celery.task
def log_error(task_id):
    result = celery.AsyncResult(task_id)
    result.get(propagate=False)  # make sure result written.
    with open(os.path.join('/var/errors', task_id), 'a') as fh:
        fh.write('--\n\n%s %s %s' % (
            task_id, result.result, result.traceback))
Abbasov Alexander
  • 1,848
  • 1
  • 18
  • 27
  • Thank you for the tips about shortcuts. Although this would work, it doesn't solve my problem. I want the second task to not ever execute if the first one fails. This solution still has the overhead of starting the second task every time independent of the results of the first task. I want to stop execution of the chain. – Salami Jul 08 '13 at 17:15
  • I understood you. If task raised an exception execution of the chain will stop. Its behavior by default. You don't need to search special decision for it. – Abbasov Alexander Jul 08 '13 at 21:01
  • @Alexander, raising the exception is NOT working correctly. "When I try the things I have commented out after [2] (raising exceptions, removing callbacks)... we get stuck on the join() in check_orders for some reason (it breaks the group)." – Salami Jul 10 '13 at 20:10