0

I can't send tasks to celery when trying to create two separate dedicated workers. I have gone through the docs and this question, but it did not improve my situation.

My configuration is the following:

CELERY_RESULT_BACKEND = 'django-db'
CELERY_BROKER_URL = f'redis://{env("REDIS_HOST")}:{env("REDIS_PORT")}/{env("REDIS_CELERY_DB")}'
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('media', Exchange('media'), routing_key='media'),
)
CELERY_ROUTES = {
    'books.tasks.resize_book_photo': {
        'queue': 'media',
        'routing_key': 'media',
    },
}

Tasks are defined in the following way in the tasks.py file:

import logging
import time

from celery import shared_task


from books.models import Author, Book
from books.commands import resize_book_photo as resize_book_photo_command


logger = logging.getLogger(__name__)


@shared_task
def list_test_books_per_author():
    time.sleep(5)
    queryset = Author.objects.all()
    for author in queryset:
        for book in author.testing_books:
            logger.info(book.title)


@shared_task
def resize_book_photo(book_id: int):
    resize_book_photo_command(Book.objects.get(id=book_id))

And they are called using apply_async:

list_test_books_per_author.apply_async()
resize_book_photo.apply_async((book.id,))

When I run celery flower I see that no tasks appear in queues. Celery flower panel showing two active workers without tasks.

The workers are started using:

celery -A blacksheep worker -l info --autoscale=10,1 -Q media --host=media@%h
celery -A blacksheep worker -l info --autoscale=10,1 -Q default --host=default@%h

What I can do is by using redis-cli and 127.0.0.1:6379> LRANGE celery 1 100 command confirm that they end up under celery key (which is the default one for celery). No workers seems to consume.

EDIT After taking a closer look at this part of documentation I noticed that my naming was wrong. After changing settings to:

CELERY_RESULT_BACKEND = 'django-db'
CELERY_BROKER_URL = f'redis://{env("REDIS_HOST")}:{env("REDIS_PORT")}/{env("REDIS_CELERY_DB")}'
CELERY_TASK_DEFAULT_QUEUE = 'default'
# CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_TASK_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('media', Exchange('media'), routing_key='media'),
)
CELERY_ROUTES = {
    'books.tasks.resize_book_photo': {
        'queue': 'media',
        'routing_key': 'media',
    },
}

Situation improved: tasks are consumed from default queue, but the task I want to go to media queue also goes to the default.

EDIT2 I have tried to explicitly tell certain task to go to other queue by changing its call to resize_book_photo.apply_async((book.id,), queue='media'). The task has been correctly dispatched to proper queue and consumed. I would, however, prefer this to be handled automatically so that I don't have to define queue whenever I call apply_async

gonczor
  • 3,994
  • 1
  • 21
  • 46
  • Have you tried change this celery -A blacksheep worker -l info --autoscale=10,1 -Q default --host=media@%h for this celery -A blacksheep worker -l info --autoscale=10,1 -Q media --host=media@%h ? The param -Q name – Paulo Henrique Dec 12 '19 at 15:59
  • Yes. And this isn't going to help since the problem is with routing tasks to proper queue from backend and not with consuming them once they have been saved by broker. – gonczor Dec 12 '19 at 16:06

1 Answers1

7

Try CELERY_TASK_ROUTES instead of CELERY_ROUTES. This worked for me recently with django integration.

explanation is buried in this comment: How to route tasks to different queues with Celery and Django

Jonah
  • 727
  • 5
  • 12