10

Part1

I've read and tried various SO threads to purge the celery tasks using Redis, but none of them worked. Please let me know how to purge tasks in celery using Redis as the broker.

Part 2

Also, I've multiple queues. I can run it within the project directory, but when demonizing, the workers dont take task. I still need to start the celery workers manually. How can I demozize it?

Here is my celerd conf.

# Name of nodes to start, here we have a single node
CELERYD_NODES="w1 w2 w3 w4"


CELERY_BIN="/usr/local/bin/celery"

# Where to chdir at start.
CELERYD_CHDIR="/var/www/fractal/parser-quicklook/"

# Python interpreter from environment, if using virtualenv
#ENV_PYTHON="/somewhere/.virtualenvs/MyProject/bin/python"

# How to call "manage.py celeryd_multi"
#CELERYD_MULTI="/usr/local/bin/celeryd-multi"

# How to call "manage.py celeryctl"
#CELERYCTL="/usr/local/bin/celeryctl"

#CELERYBEAT="/usr/local/bin/celerybeat"

# Extra arguments to celeryd
CELERYD_OPTS="--time-limit=300 --concurrency=8  -Q BBC,BGR,FASTCOMPANY,Firstpost,Guardian,IBNLIVE,LIVEMINT,Mashable,NDTV,Pandodaily,Reuters,TNW,TheHindu,ZEENEWS "

# Name of the celery config module, don't change this.
CELERY_CONFIG_MODULE="celeryconfig"

# %n will be replaced with the nodename.
CELERYD_LOG_FILE="/var/log/celery/%n.log"
CELERYD_PID_FILE="/var/run/celery/%n.pid"

# Workers should run as an unprivileged user.
#CELERYD_USER="nobody"
#CELERYD_GROUP="nobody"

# Set any other env vars here too!
PROJET_ENV="PRODUCTION"

# Name of the projects settings module.
# in this case is just settings and not the full path because it will change the dir to
# the project folder first.
CELERY_CREATE_DIRS=1

Celeryconfig is already provided in part1.

Here is my proj directory structure.

project
|-- main.py
|-- project
|   |-- celeryconfig.py
|   |-- __init__.py
|-- tasks.py

How can I demonize with the Queues? I have provided the queues in CELERYD_OPTS as well.

Is there a way in which we can dynamically demonize the number of queues in the celery? For eg:- we have CELERY_CREATE_MISSING_QUEUES = True for creating the missing queues. Is there something similar to daemonize the celery queues?

Praful Bagai
  • 16,684
  • 50
  • 136
  • 267
  • as for daemonizing: please open another question. also, please read the docs http://celery.readthedocs.org/en/latest/tutorials/daemonizing.html – Capi Etheriel Feb 19 '15 at 13:08
  • I followed the doc, and set everything up in the given manner. But when I view `sudo service celeryd status` it says `celeryd not running (no pidfile)`. How can I solve it? – Praful Bagai Feb 20 '15 at 10:37

5 Answers5

14

celery purge should be enough to clean up the queue in redis. However, your worker will have its own reserved tasks and it will send them back to the queue when you stop the worker. So, first, stop all the workers. Then run celery purge.

Capi Etheriel
  • 3,542
  • 28
  • 49
13

If you have several queues, celery purge will purge the default one. You can specify which queue(s) you would like to purge as such:

celery purge -A proj -Q queue1,queue2
nbeuchat
  • 6,575
  • 5
  • 36
  • 50
  • This has now changed, and the correct command is `celery -A proj purge -Q queue1,queue2`. `-A` is no longer supported as an argument for sub-commands. – Navid Khan Dec 10 '22 at 16:44
2

In response to part 1, a programmatic solution to purge your queue, further documentation can be found at the following link celery.app.control.purge docs.

from celery import Celery

app = Celery()
app.control.purge()
#OR
app.control.discard_all()
HDubz
  • 167
  • 1
  • 2
  • 14
1

This revokes all the tasks it can without terminating any processes. (To do so add terminate=True to the revoke call at your own risk.)

It takes a second or two to run, so is not suitable for high throughput code.

from myapp.celery import app as celery_app


celery_app.control.purge()

i = celery_app.control.inspect()
# scheduled(): tasks with an ETA or countdown
# active():    tasks currently running - probably not revokable without terminate=True
# reserved():  enqueued tasks - usually revoked by purge() above
for queues in (i.active(), i.reserved(), i.scheduled()):
    for task_list in queues.values():
        for task in task_list:
            task_id = task.get("request", {}).get("id", None) or task.get("id", None)
            celery_app.control.revoke(task_id)

Just .purge() then revoking .scheduled() would probably have the same effect to be honest, I haven't experimented extensively. But purge alone will not revoke tasks sat in the queues with an ETA or countdown set.

Credit to @kahlo's answer, which was the basis for this.

Chris
  • 5,664
  • 6
  • 44
  • 55
0

Starting with Celery v5, you should now use:

celery -A proj purge -Q queue1,queue2
Androz2091
  • 2,931
  • 1
  • 9
  • 26