1

I am trying to prioritize certain tasks using celery (v5.0.0) but it seems I am missing something fundamental. According to the documentation, task priority should be available for RabbitMQ. However, whenever I try to add the relevant lines to the configuration file, task execution stops working. To illustrate my problem, I will first outline my setup without the prioritization.
My setup consists of a configuration file celeryconfig.py

from kombu import Exchange, Queue

broker_url = 'amqp://user:password@some-rabbit//'
result_backend = 'rpc://'

Then I have a file for task definition, prio.py, that contains three tasks intended for execution timing:

import time

from celery import Celery

app = Celery('tasks')
app.config_from_object('celeryconfig')


@app.task
def get_time():
    t0 = time.time()
    return t0


@app.task
def wait(t0):
    time.sleep(0.2)
    return t0


@app.task
def time_delta(t0):
    time.sleep(0.2)
    t1 = time.time()
    return t1 - t0

And finally, the code to start the tasks (in start_tasks.py). Here, I have created two identical paths (getting time t0, wait two times, get time t1, and calculate t1-t0) that I later want to prioritize in order to let the first one finish before the second starts.

from celery import chain, group, Celery

from prio import get_time, time_delta, wait

tasks = chain(get_time.s(), 
        group([
               chain(wait.s(), wait.s(), time_delta.s()), 
               chain(wait.s(), wait.s(), time_delta.s())
        ]))
res = tasks.apply_async()
print(res.get())

This will output

>>> [1.0178837776184082, 1.2237505912780762]

I interpret the results such that the first and the second path tasks run alternately, which results in the small difference of execution times. In the end, I would like to achieve a result like [0.6, 1.2].
Now, to introduce prioritization, I have modified the celeryconfig.py according to the documentation:

from kombu import Exchange, Queue

broker_url = 'amqp://user:password@some-rabbit//'
result_backend = 'rpc://'

task_queues = [
    Queue('tasks', Exchange('tasks'), routing_key='tasks', queue_arguments={'x-max-priority': 10}),
]
task_queue_max_priority = 10
task_default_priority = 5

With this change, however, the tasks do not seem to be executed at all, and I have no clue what to change in order to make it work. I have already tried to pass the queue name to apply_async (res = tasks.apply_async(queue='tasks')) but that did not solve the problem. Any hints are welcome!

Edit: Since I still cannot get it to work, I tried to make the setup cleaner and clearer using docker containers. I created a docker image via the following minimalistic Dockerfile:

FROM python:latest

RUN pip install --quiet celery

The image is then built via docker build --tag minimalcelery ..

I start RabbitMQ with:

docker run --network celery_network --interactive --tty --hostname my-rabbit --name some-rabbit --env RABBITMQ_DEFAULT_USER=user --env RABBITMQ_DEFAULT_PASS=password --rm rabbitmq

The client is started with:

docker run --name "client" --interactive --tty --rm --volume %cd%:/home/work --hostname localhost --network celery_network --env PYTHONPATH=/home/work/ minimalcelery /bin/bash -c "cd /home/work/ && python start_tasks.py"

The worker is started with:

docker run --name "worker" --interactive --tty --rm --volume %cd%:/home/work --hostname localhost --network celery_network --env PYTHONPATH=/home/work/ minimalcelery /bin/bash -c "cd /home/work/ && celery --app=prio worker"

prio.py was modified according to @ItayB's answer:


from celery import Celery

app = Celery('tasks', backend='rpc://', broker='amqp://user:password@some-rabbit//')
app.config_from_object('celeryconfig')


@app.task
def get_time(**kwargs):
    t0 = time.time()
    return t0


@app.task
def wait(t0, **kwargs):
    time.sleep(0.2)
    return t0


@app.task
def time_delta(t0, **kwargs):
    time.sleep(0.2)
    t1 = time.time()
    return t1 - t0

With this setup and the "unpriorized" config, I get a result similar to the one I previously obtained: [0.6443462371826172, 0.6746957302093506].

However, as before, if I use the additional infos for queueing/priorization, I don't get any reply. These are the files I used:
celeryconfig.py:

task_queues = [
    Queue('tasks', Exchange('tasks'), routing_key='tasks', queue_arguments={'x-max-priority': 10}),
]
task_queue_max_priority = 10
task_default_priority = 5

start_tasks.py:

from celery import Celery, chain, group

from prio import get_time, time_delta, wait

tasks = chain(get_time.s(), group([chain(wait.s(), wait.s(), time_delta.s()), chain(wait.s(), wait.s(), time_delta.s())]))
print("about to start the tasks.")
res = tasks.apply_async(queue='tasks', routing_key='tasks', priority=5)
print("waiting...")
print(res.get())

Also, I tried to modify the command, by which the worker is started:

docker run --name "worker" --interactive --tty --rm --volume %cd%:/home/work --hostname localhost --network celery_network --env PYTHONPATH=/home/work/
minimalcelery /bin/bash -c "cd /home/work/ && celery --app=prio worker --hostname=worker.tasks@%h --queues=tasks"

I am aware that this setup does not prioritize the "paths" differently. Currently, I just want to get an answer from the worker.

Jan
  • 21
  • 1
  • 3

1 Answers1

0

It's seems like you're almost there. It's been a while since I did that but I'll try (based on some snippets I have):

  1. change your celery tasks to support kwargs:
def get_time(**kwargs):
  1. pass the priority when you set the signature:
get_time.s(kwargs={"priority": 3})  # set value below x-max-priority

I'm not sure if it's a must but I've also defined the task as immutable:

get_time.s(immutable=True, kwargs={"priority": 3})
  1. set the worker_prefetch_multiplier to 1. The default is 4, which means that 4 tasks are prefetched together so I think there won't be prioritization between them (AFAIR).

Good luck!

ItayB
  • 10,377
  • 9
  • 50
  • 77
  • First of all, thanks for your promt answer! And sorry it took so long to report back. Unfortunately, I still could not get it to work. Your suggestions are very valuable to actually prioritize the tasks but I am currently struggling to get an answer at all with the setup containing queues etc. To make the setup clearer, I edited my question. I assume that I am missing some very crucial parameter somewhere, but cannot figure out which one and where it is missing. – Jan Jan 02 '21 at 23:55
  • @Jan first, no problem. You can thank me by accepting the answer (and or upvoting). It's hard to track if you're changing your original question - next time it will be better to open a new one instead. I'll try to look later on to see if I can help – ItayB Jan 03 '21 at 06:44
  • I did not change the question. I merely tried to make my problem clearer. Also in the original post, I described the problem that I do not get any results whenever I try to use Queues. – Jan Jan 03 '21 at 20:17
  • you add the `priority` to `apply_async` instead of `s` (signature) - can you try to add the priority to those signatures? the priority is per task and not per canvas – ItayB Jan 03 '21 at 20:36
  • The problem was not with the priorities but with the queues. However, your last comment made me aware that Celery does not pass the options given to apply_async to the underlying tasks. Therefore, each subtask must be given the name of the queue it should be processed in. Passing the options in the signature as you suggested does not work, as explained in https://stackoverflow.com/a/24137886/10753602. Instead, it must be set after creation of the signature as described in the linked thread (or via the subtask method, as explained here: https://stackoverflow.com/a/14959096/10753602). – Jan Jan 05 '21 at 10:08
  • I think that the reason that it's not working for you is the mutability of your signatures. If you try a simple app that triggers dozens of tasks A with priority 6 and dozens of tasks B with priority 7 you will see the priority working.. – ItayB Jan 05 '21 at 10:41
  • @Jan any updates? did you manage to do that? – ItayB Jan 16 '21 at 09:50