I am quite new to Celery and I have been trying to setup a project with 2 separate queues (one to calculate and the other to execute). So far, so good.
My problem is that the workers in the execute queue need to instantiate a class with a unique object_id (one id per worker). I was wondering if I could write a custom worker initialization to initialize the object at start and keep it in memory until the worker is killed.
I found a similar question on custom_task but the proposed solution does not work in my case.
Considering the following toy example:
celery.py
from celery import Celery
app = Celery('proj',
broker='amqp://guest@localhost//',
backend='amqp://',
include=['proj.tasks'])
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=60,
CELERY_ROUTES = {"proj.tasks.add1": {"queue": "q1"}},
)
if __name__ == '__main__':
app.start()
tasks.py
from proj.celery import app
from celery.signals import worker_init
@worker_init.connect(sender='worker1@hostname')
def configure_worker1(*args, **kwargs):
#SETUP id=1 for add1 here???
@worker_init.connect(sender='worker2@hostname')
def configure_worker2(*args, **kwargs):
#SETUP id=2 for add1 here???
@app.task
def add1(y):
return id + y
@app.task
def add(x, y):
return x + y
initializing:
celery multi start worker1 -A proj -l info -Q q1
celery multi start worker2 -A proj -l info -Q q1
celery multi start worker3 -A proj -l info
Is this the right approach? If so, what should I write in the configure_worker1
function in tasks.py
to setup id
at the worker initialization?
Thanks