130

I have been reading the doc and searching but cannot seem to find a straight answer:

Can you cancel an already executing task? (as in the task has started, takes a while, and half way through it needs to be cancelled)

I found this from the doc at Celery FAQ

>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()

But I am unclear if this will cancel queued tasks or if it will kill a running process on a worker. Thanks for any light you can shed!

Shapi
  • 5,493
  • 4
  • 28
  • 39
dcoffey3296
  • 2,504
  • 3
  • 24
  • 34

8 Answers8

239

revoke cancels the task execution. If a task is revoked, the workers ignore the task and do not execute it. If you don't use persistent revokes your task can be executed after worker's restart.

https://docs.celeryq.dev/en/stable/userguide/workers.html#worker-persistent-revokes

revoke has an terminate option which is False by default. If you need to kill the executing task you need to set terminate to True.

>>> from celery.task.control import revoke
>>> revoke(task_id, terminate=True)

https://docs.celeryq.dev/en/stable/userguide/workers.html#revoke-revoking-tasks

mher
  • 10,508
  • 2
  • 35
  • 27
  • 2
    Does this work in a distributed env? I mean if I have workers on multiple machines that are executing tasks. Does celery keep track of which machine the task is executing on? – ksrini Mar 27 '13 at 13:55
  • 1
    It does. The communication with workers takes place via the broker. – mher Mar 28 '13 at 17:46
  • 6
    result.revoke(terminate=True) should do the same thing as revoke(task_id, terminate=True) – CamHart Jul 15 '14 at 00:59
  • 16
    Also, using the terminate option is "a last resort for administrators", as per recent Celery docs. You run the risk of terminating another task which has recently started on that worker. – kouk Apr 14 '15 at 12:39
  • unfortunately this works only on ampq and redis [doc](http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#std:control-revoke) – Ja8zyjits Mar 24 '16 at 06:19
  • @Ja8zyjits Why does this only work on ampq and redis? – freethebees Apr 25 '17 at 15:34
  • 2
    It doesn't work, ```>>> from proj.celery import app
    >>> app.control.revoke(task_id) ``` This works only
    – A.Raouf Nov 18 '17 at 14:16
  • From the command line, this worked like a charm using the hints from this answer. I needed to stop some actively-executing tasks that were running way longer than desired: `celery -A inspect active | grep “” | sed -e "s/.*id': '\([^']*\).*/\1/" | xargs celery -A control terminate KILL` – berto Dec 11 '19 at 15:59
  • Have we ability to revoke and DELETE tasks not from command prompt? – parfeniukink Jan 14 '21 at 10:32
59

In Celery 3.1, the API of revoking tasks is changed.

According to the Celery FAQ, you should use result.revoke:

>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()

or if you only have the task id:

>>> from proj.celery import app
>>> app.control.revoke(task_id)
Rockallite
  • 16,437
  • 7
  • 54
  • 48
38

@0x00mh's answer is correct, however recent celery docs say that using the terminate option is "a last resort for administrators" because you may accidentally terminate another task which started executing in the meantime. Possibly a better solution is combining terminate=True with signal='SIGUSR1' (which causes the SoftTimeLimitExceeded exception to be raised in the task).

gogasca
  • 9,283
  • 6
  • 80
  • 125
kouk
  • 1,453
  • 12
  • 12
  • 4
    This solution worked very well for me. When `SoftTimeLimitExceeded` is raised in my task, my custom cleanup logic (implemented via `try`/`except`/`finally`) is invoked. This is much better, in my view, than what `AbortableTask` offers (http://docs.celeryproject.org/en/latest/reference/celery.contrib.abortable.html). With the latter, you need a database result backend _and_ you have to manually and repeatedly check the status of an ongoing task to see if it's been aborted. – David Schneider Mar 19 '18 at 23:36
  • 5
    How is this better, as far I understand if there is any other task picked up by the process, its gonna be stopped anyway, just different exception will be thrown. – marxin Dec 14 '18 at 13:59
  • If I use `worker_prefetch_multiplier = 1` since I just have a few long running tasks the terminate should be fine - since no other tasks will be effected by terminating - did I get this correct? @spicyramen – maffe Feb 11 '19 at 08:26
6

Per the 5.2.3 documentation, the following command can be run:

    celery.control.revoke(task_id, terminate=True, signal='SIGKILL')

where celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])

Link to the doc: https://docs.celeryq.dev/en/stable/reference/celery.app.control.html?highlight=revoke#celery.app.control.Control.revoke

Amogh Joshi
  • 449
  • 5
  • 13
5

You define celery app with broker and backend something like :

from celery import Celery
celeryapp = Celery('app', broker=redis_uri, backend=redis_uri)

When you run send task it return unique id for task:

task_id = celeryapp.send_task('run.send_email', queue = "demo")

To revoke task you need celery app and task id:

celeryapp.control.revoke(task_id, terminate=True)
omkar more
  • 105
  • 2
  • 10
4

In addition, unsatisfactory, there is another way(abort task) to stop the task, but there are many unreliability, more details, see: http://docs.celeryproject.org/en/latest/reference/celery.contrib.abortable.html

xiaopo
  • 627
  • 5
  • 4
  • This seems to be the best way to do it when using the 'threads' pool, as celery.control.revoke(task_id, terminate=True, signal='SIGKILL') does not work. – geometrikal May 17 '23 at 06:03
2
from celery.app import default_app

revoked = default_app.control.revoke(task_id, terminated=True, signal='SIGKILL')
print(revoked)
  • As it’s currently written, your answer is unclear. Please edit to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers in the help center. – JayPeerachai Feb 24 '22 at 04:59
1

See the following options for tasks: time_limit, soft_time_limit (or you can set it for workers). If you want to control not only time of execution, then see expires argument of apply_async method.

jjcf89
  • 468
  • 1
  • 5
  • 13
simplylizz
  • 1,686
  • 14
  • 28