I can't send tasks to celery when trying to create two separate dedicated workers. I have gone through the docs and this question, but it did not improve my situation.
My configuration is the following:
CELERY_RESULT_BACKEND = 'django-db'
CELERY_BROKER_URL = f'redis://{env("REDIS_HOST")}:{env("REDIS_PORT")}/{env("REDIS_CELERY_DB")}'
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('media', Exchange('media'), routing_key='media'),
)
CELERY_ROUTES = {
'books.tasks.resize_book_photo': {
'queue': 'media',
'routing_key': 'media',
},
}
Tasks are defined in the following way in the tasks.py
file:
import logging
import time
from celery import shared_task
from books.models import Author, Book
from books.commands import resize_book_photo as resize_book_photo_command
logger = logging.getLogger(__name__)
@shared_task
def list_test_books_per_author():
time.sleep(5)
queryset = Author.objects.all()
for author in queryset:
for book in author.testing_books:
logger.info(book.title)
@shared_task
def resize_book_photo(book_id: int):
resize_book_photo_command(Book.objects.get(id=book_id))
And they are called using apply_async
:
list_test_books_per_author.apply_async()
resize_book_photo.apply_async((book.id,))
When I run celery flower I see that no tasks appear in queues.
The workers are started using:
celery -A blacksheep worker -l info --autoscale=10,1 -Q media --host=media@%h
celery -A blacksheep worker -l info --autoscale=10,1 -Q default --host=default@%h
What I can do is by using redis-cli
and 127.0.0.1:6379> LRANGE celery 1 100
command confirm that they end up under celery
key (which is the default one for celery). No workers seems to consume.
EDIT After taking a closer look at this part of documentation I noticed that my naming was wrong. After changing settings to:
CELERY_RESULT_BACKEND = 'django-db'
CELERY_BROKER_URL = f'redis://{env("REDIS_HOST")}:{env("REDIS_PORT")}/{env("REDIS_CELERY_DB")}'
CELERY_TASK_DEFAULT_QUEUE = 'default'
# CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_TASK_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('media', Exchange('media'), routing_key='media'),
)
CELERY_ROUTES = {
'books.tasks.resize_book_photo': {
'queue': 'media',
'routing_key': 'media',
},
}
Situation improved: tasks are consumed from default
queue, but the task I want to go to media
queue also goes to the default
.
EDIT2 I have tried to explicitly tell certain task to go to other queue by changing its call to resize_book_photo.apply_async((book.id,), queue='media')
. The task has been correctly dispatched to proper queue and consumed. I would, however, prefer this to be handled automatically so that I don't have to define queue whenever I call apply_async