4

Is there an option to pass variable to celery worker on start and use it inside worker on execution time?

I'm writing server that will be responsible for machine learning training and evaluation. I would like to dynamically start new instance of worker and pass variable to it that will be used to load specific model inside.

I found how to start worker from answers here with worker_main method.

I was thinking about two solutions:

  1. Set it as environmental variable. The problem with this solution is that it can be corrupted when two instances of worker will be created the same time.

  2. Pass it as an argv but I don't know how to read the variable inside worker.


EDIT

I found this thread but it only talks about accessing custom parameter in task. My question is about accessing it at worker initialization.

Inspired by this thread I'll try with celery signals. http://docs.celeryproject.org/en/latest/userguide/signals.html#worker-init

Konrad
  • 952
  • 10
  • 25

2 Answers2

8

Maybe my question wasn't accurate enough but I found answer by myself with doc and stackoverflow threads.

I wanted to run separate worker for Keras model. In worker initialization I needed to load model to memory and in tasks model was used for prediction.

My solution:

  1. Name worker with model_id (since id is unique and I need only one worker per model)
  2. On celeryd_after_setup signal function I parsed name and set global variable in worker
  3. On worker_process_init signal function I loaded model in my case it was static fields in Grasper class
  4. In task I used static fields from Grasper class

Bellow some code exactly describing the solution.

from celery.signals import worker_process_init, celeryd_after_setup
from celery.concurrency import asynpool

# my custom class containing static fields for model and tokenizer
# it also can be global variable as model_id
from myapp.ml import Grasper

# set to have some time for model loading otherwise worker_process_init can terminate
asynpool.PROC_ALIVE_TIMEOUT = 100.0
model_id = None

@celeryd_after_setup.connect()
def set_model_id(sender, instance, **kwargs):
    global model_id
    model_id = instance.hostname.split('@')[1]

@worker_process_init.connect()
def configure_worker(signal=None, sender=None, **kwargs):
    Grasper.load_model(model_id)

Then in celery task you can use Grasper class with loaded model. This solution works but I know there is a place for improvement so if you have some ideas please comment.

Konrad
  • 952
  • 10
  • 25
1

Environment variables are copied from parent to child when a subprocess is forked/spawned. This means the process can manipulate its own variables, but other processes will not (it is possible but exceptional - read this thread for some background: Is there a way to change the environment variables of another process in Unix?)

If there's concern about a race condition in your own code, you should consider a lock over the section where you mutate os.environ of the parent and spawn the worker. After the worker is spawned as a separate process, release the lock, and you have no fear of corrupting the child by modifying the environment of the parent.

Nino Walker
  • 2,742
  • 22
  • 30
  • Thanks for response but I think I will stay with the worker_init signal and passing ML model id as queue name to worker, then load it in signal callback. It would allow spawning multiple workers at the same time and I think it will be more sophisticated solution. – Konrad Dec 21 '18 at 21:42