31

I've define a Celery app in a module, and now I want to start the worker from the same module in its __main__, i.e. by running the module with python -m instead of celery from the command line. I tried this:

app = Celery('project', include=['project.tasks'])

# do all kind of project-specific configuration
# that should occur whenever this module is imported

if __name__ == '__main__':
    # log stuff about the configuration
    app.start(['worker', '-A', 'project.tasks'])

but now Celery thinks I'm running the worker without arguments:

Usage: worker <command> [options] 

Show help screen and exit.

Options:
  -A APP, --app=APP     app instance to use (e.g. module.attr_name)
[snip]

The usage message is the one you get from celery --help, as if it didn't get a command. I've also tried

app.worker_main(['-A', 'project.tasks'])

but that complains about the -A not being recognized.

So how do I do this? Or alternatively, how do I pass a callback to the worker to have it log information about its configuration?

Fred Foo
  • 355,277
  • 75
  • 744
  • 836

6 Answers6

23

using app.worker_main method (v3.1.12):

± cat start_celery.py
#!/usr/bin/python

from myapp import app


if __name__ == "__main__":
    argv = [
        'worker',
        '--loglevel=DEBUG',
    ]
    app.worker_main(argv)
okocian
  • 488
  • 4
  • 11
17

Since Celery 5 things have been changed

The worker_main results now:

AttributeError: 'Celery' object has no attribute 'worker_main'

For Celery 5 do following:

app = celery.Celery(
    'project',
    include=['project.tasks']
)

if __name__ == '__main__':
    worker = app.Worker(
        include=['project.tasks']
    )
    worker.start()

See here celery.apps.worker and celery.worker.WorkController.setup_defaults for details (hope it will be documented better in the future).

  • 3
    For anyone trying to do this in 2021 this is the approach that worked for me. (Celery version 5.1) – categulario Jun 17 '21 at 23:31
  • 1
    Nice, but can you reload the whoe thing? I tried worker.reload() it didn't work. Also tried worker.stop() and worker.start() but didn't work properly neither. – Nachokhan Apr 28 '22 at 16:11
  • Is there a way to check whether the worker is already running before initializing it and starting? Also, how do you specify what queue the worker should listen to with this method? – jacob Mar 16 '23 at 22:26
  • For specific queues, simply use the kwarg `queues` (e.g. `app.Worker(queues=["some_queue", "some_other_queue"])`). I still have not found a good way to see if a previously created worker process is still running on the host from within Python. Also, is there a way to call `worker.start()` without blocking? – jacob Mar 16 '23 at 22:41
13

Based on code from Django-Celery module you could try something like this:

from __future__ import absolute_import, unicode_literals

from celery import current_app
from celery.bin import worker


if __name__ == '__main__':
    app = current_app._get_current_object()

    worker = worker.worker(app=app)

    options = {
        'broker': 'amqp://guest:guest@localhost:5672//',
        'loglevel': 'INFO',
        'traceback': True,
    }

    worker.run(**options)
daniula
  • 6,898
  • 4
  • 32
  • 49
  • It looks like this is the deprecated way of starting the worker, but their new way is too tangled up in Django code for me to figure it out. I've gone with this solution, thanks. – Fred Foo May 01 '14 at 12:10
  • 2
    Why deprecated? This code is run when you call `python manage.py celeryd`. It doesn't throw any warnings. – daniula May 01 '14 at 19:07
  • The code says it's the old way of running the worker. – Fred Foo May 02 '14 at 11:44
  • 2
    Now I see it. Alternative solution has same concept. Take `djcelery/management/commands/celery.py` code and modify it, so it will always behave as `./manage.pu celery worker` is called. – daniula May 02 '14 at 20:32
9

worker_main was put back in celery 5.0.3 here: https://github.com/celery/celery/pull/6481

This worked for me on 5.0.4:

self.app.worker_main(argv = ['worker', '--loglevel=info', '--concurrency={}'.format(os.environ['CELERY_CONCURRENCY']), '--without-gossip'])
dap
  • 91
  • 1
  • 1
4

I think you are just missing wrapping the args so celery can read them, like:

queue = Celery('blah', include=['blah'])
queue.start(argv=['celery', 'worker', '-l', 'info'])
nurieta
  • 1,615
  • 15
  • 6
0

I would like to expand on Tomasz Hławiczka's answer for Celery 5+. As mentioned in that answer, the following works:

from celery import Celery

app = Celery() # args and kwargs as needed

if __name__ == "__main__":
    worker = app.Worker()
    worker.start()

However, I stumbled upon this answer while trying to run the Celery worker along side a Flask app with the Flask-SocketIO run method. Specifically, I tried to setup my worker, then run my flask app, however, worker.start() blocks, so the flask app never ran. To solve this, I used the following:

import subprocess
from celery import Celery
from flask import Flask
from flask_socketio import SocketIO

cel = Celery() # args and kwargs as needed
app = Flask(__name__)
socketio = SocketIO(app, message_queue=<redis or rabbitmq here>)

if __name__ == "__main__":
    cmd = "celery -A project.cel worker -Q specific_queue".split(' ')
    subprocess.Popen(cmd)

    socketio.run(app)

Or more generally:

import subprocess
from celery import Celery

app = Celery() # args and kwargs as needed

if __name__ == "__main__":
    cmd = "celery -A project.app worker -Q specific_queue".split(' ')
    subprocess.Popen(cmd)

    # more code you need to run after starting worker

In addition, there are times previously created workers are still running, and you don't want to start more. This is particularly true during development. To check for other workers before starting another, you can do the following:

import subprocess
from celery import Celery

app = Celery() # args and kwargs as needed

if __name__ == "__main__":
    active_workers = app.control.inspect().active() 

    if active_workers == None:
        cmd = "celery -A project.app worker -Q specific_queue".split(' ')
        subprocess.Popen(cmd)

    # more code you need to run after starting worker

If you are doing all this across multiple hosts and you need to check for a specific worker, do the following:

import subprocess
from celery import Celery

app = Celery() # args and kwargs as needed

if __name__ == "__main__":
    active_workers = app.control.inspect().active() 

    if active_workers == None or "celery@hostname" not in active_workers.keys():
        cmd = "celery -A project.app worker -Q specific_queue".split(' ')
        subprocess.Popen(cmd)

    # more code you need to run after starting worker

If you don't care about blocking, and don't want to use the subprocess library but you still want your worker to listen to a specific queue, do the following:

from celery import Celery

app = Celery() # args and kwargs as needed

if __name__ == "__main__":
    worker = app.Worker(queues=["specific_queue"])
    worker.start()

Of course, if you want multiple workers, each listening to their own specific queue, you'd have to use the subprocess method, as you cannot start another worker after the first start() call blocks.

jacob
  • 828
  • 8
  • 13