I am trying to prioritize certain tasks using celery (v5.0.0) but it seems I am missing something fundamental. According to the documentation, task priority should be available for RabbitMQ. However, whenever I try to add the relevant lines to the configuration file, task execution stops working. To illustrate my problem, I will first outline my setup without the prioritization.
My setup consists of a configuration file celeryconfig.py
from kombu import Exchange, Queue
broker_url = 'amqp://user:password@some-rabbit//'
result_backend = 'rpc://'
Then I have a file for task definition, prio.py
, that contains three tasks intended for execution timing:
import time
from celery import Celery
app = Celery('tasks')
app.config_from_object('celeryconfig')
@app.task
def get_time():
t0 = time.time()
return t0
@app.task
def wait(t0):
time.sleep(0.2)
return t0
@app.task
def time_delta(t0):
time.sleep(0.2)
t1 = time.time()
return t1 - t0
And finally, the code to start the tasks (in start_tasks.py
). Here, I have created two identical paths (getting time t0, wait two times, get time t1, and calculate t1-t0) that I later want to prioritize in order to let the first one finish before the second starts.
from celery import chain, group, Celery
from prio import get_time, time_delta, wait
tasks = chain(get_time.s(),
group([
chain(wait.s(), wait.s(), time_delta.s()),
chain(wait.s(), wait.s(), time_delta.s())
]))
res = tasks.apply_async()
print(res.get())
This will output
>>> [1.0178837776184082, 1.2237505912780762]
I interpret the results such that the first and the second path tasks run alternately, which results in the small difference of execution times. In the end, I would like to achieve a result like [0.6, 1.2]
.
Now, to introduce prioritization, I have modified the celeryconfig.py
according to the documentation:
from kombu import Exchange, Queue
broker_url = 'amqp://user:password@some-rabbit//'
result_backend = 'rpc://'
task_queues = [
Queue('tasks', Exchange('tasks'), routing_key='tasks', queue_arguments={'x-max-priority': 10}),
]
task_queue_max_priority = 10
task_default_priority = 5
With this change, however, the tasks do not seem to be executed at all, and I have no clue what to change in order to make it work. I have already tried to pass the queue name to apply_async (res = tasks.apply_async(queue='tasks')
) but that did not solve the problem. Any hints are welcome!
Edit: Since I still cannot get it to work, I tried to make the setup cleaner and clearer using docker containers. I created a docker image via the following minimalistic Dockerfile:
FROM python:latest
RUN pip install --quiet celery
The image is then built via docker build --tag minimalcelery .
.
I start RabbitMQ with:
docker run --network celery_network --interactive --tty --hostname my-rabbit --name some-rabbit --env RABBITMQ_DEFAULT_USER=user --env RABBITMQ_DEFAULT_PASS=password --rm rabbitmq
The client is started with:
docker run --name "client" --interactive --tty --rm --volume %cd%:/home/work --hostname localhost --network celery_network --env PYTHONPATH=/home/work/ minimalcelery /bin/bash -c "cd /home/work/ && python start_tasks.py"
The worker is started with:
docker run --name "worker" --interactive --tty --rm --volume %cd%:/home/work --hostname localhost --network celery_network --env PYTHONPATH=/home/work/ minimalcelery /bin/bash -c "cd /home/work/ && celery --app=prio worker"
prio.py
was modified according to @ItayB's answer:
from celery import Celery
app = Celery('tasks', backend='rpc://', broker='amqp://user:password@some-rabbit//')
app.config_from_object('celeryconfig')
@app.task
def get_time(**kwargs):
t0 = time.time()
return t0
@app.task
def wait(t0, **kwargs):
time.sleep(0.2)
return t0
@app.task
def time_delta(t0, **kwargs):
time.sleep(0.2)
t1 = time.time()
return t1 - t0
With this setup and the "unpriorized" config, I get a result similar to the one I previously obtained: [0.6443462371826172, 0.6746957302093506]
.
However, as before, if I use the additional infos for queueing/priorization, I don't get any reply. These are the files I used:
celeryconfig.py:
task_queues = [
Queue('tasks', Exchange('tasks'), routing_key='tasks', queue_arguments={'x-max-priority': 10}),
]
task_queue_max_priority = 10
task_default_priority = 5
start_tasks.py:
from celery import Celery, chain, group
from prio import get_time, time_delta, wait
tasks = chain(get_time.s(), group([chain(wait.s(), wait.s(), time_delta.s()), chain(wait.s(), wait.s(), time_delta.s())]))
print("about to start the tasks.")
res = tasks.apply_async(queue='tasks', routing_key='tasks', priority=5)
print("waiting...")
print(res.get())
Also, I tried to modify the command, by which the worker is started:
docker run --name "worker" --interactive --tty --rm --volume %cd%:/home/work --hostname localhost --network celery_network --env PYTHONPATH=/home/work/
minimalcelery /bin/bash -c "cd /home/work/ && celery --app=prio worker --hostname=worker.tasks@%h --queues=tasks"
I am aware that this setup does not prioritize the "paths" differently. Currently, I just want to get an answer from the worker.