8

I am using celery to run long running tasks on Hadoop. Each task executes a Pig script on Hadoop which runs for about 30 mins - 2 hours.

My current Hadoop setup has 4 queues a,b,c, and default. All tasks are currently being executed by a single worker which submits the job to a single queue.

I want to add 3 more workers which submit jobs to other queues, one worker per queue.

The problem is the queue is currently hard-coded and I wish to make this variable per worker.

I searched a lot but I am unable to find a way to pass each celery worker a different queue value and access it in my task.

I start my celery worker like so.

celery -A app.celery worker

I wish to pass some additional arguments in the command-line itself and access it in my task but celery complains that it doesn't understand my custom argument.

I plan to run all the workers on the same host by setting the --concurrency=3 parameter. Is there any solution to this problem?

Thanks!

EDIT

The current scenario is like this. Every I try to execute the task print_something by saying tasks.print_something.delay() it only prints queue C.

@celery.task()
def print_something():
    print "C"

I need to have the workers print a variable letter based on what value I pass to them while starting them.

@celery.task()
def print_something():
    print "<Variable Value Per Worker Here>"

2 Answers2

4

Hope this helps someone.

Multiple problems needed solving for this problem.

The first step involved adding support in celery for the custom parameter. If this is not done, celery will complain that it doesn't understand the parameter.

Since I am running celery with Flask, I initialize celery like so.

def configure_celery():
    app.config.update(
        CELERY_BROKER_URL='amqp://:@localhost:5672',
        RESULT_BACKEND='db+mysql://root:@localhost:3306/<database_name>'            
    )
    celery = Celery(app.import_name, backend=app.config['RESULT_BACKEND'],
                    broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery

I call this function to initialize celery and store it in a variable called celery.

celery = configure_celery()

To add the custom parameter you need to do the following.

def add_hadoop_queue_argument_to_worker(parser):
    parser.add_argument(
        '--hadoop-queue', help='Hadoop queue to be used by the worker'
    )

The celery used below is the one we obtained from above steps.

celery.user_options['worker'].add(add_hadoop_queue_argument_to_worker)

The next step would be to make this argument accessible in the worker. To do that follow these steps.

class HadoopCustomWorkerStep(bootsteps.StartStopStep):

    def __init__(self, worker, **kwargs):
        worker.app.hadoop_queue = kwargs['hadoop_queue']

Inform celery to use this class for creating the workers.

celery.steps['worker'].add(HadoopCustomWorkerStep)

The tasks should now be able to access the variables.

@app.task(bind=True)
def print_hadoop_queue_from_config(self):
    print self.app.hadoop_queue

Verify it by running the worker on the command-line.

celery -A app.celery worker --concurrency=1 --hadoop-queue=A -n aworker@%h
celery -A app.celery worker --concurrency=1 --hadoop-queue=B -n bworker@%h
celery -A app.celery worker --concurrency=1 --hadoop-queue=C -n cworker@%h
celery -A app.celery worker --concurrency=1 --hadoop-queue=default -n defaultworker@%h
  • it is also possible to skip all the code and read an environment variable. `HADOOP_QUEUE=A celery -A app worker --pool=solo aworker@%h` – nurettin May 08 '20 at 12:57
3

What I usually do is, after starting the workers (the tasks are not executed) in another script (say manage.py) I add commands with parameters to start specific tasks or tasks with different arguments.

in manager.py:

from tasks import some_task

@click.command
def run_task(params):
    some_task.apply_async(params)

And this will start the tasks as needed.

VKolev
  • 855
  • 11
  • 25