40

I have a REST API written in Django, with and endpoint that queues a celery task when posting to it. The response contains the task id which I'd like to use to test that the task is created and get the result. So, I'd like to do something like:

def test_async_job():
    response = self.client.post("/api/jobs/", some_test_data, format="json")
    task_id = response.data['task_id']
    result = my_task.AsyncResult(task_id).get()
    self.assertEquals(result, ...)

I obviously don't want to have to run a celery worker to run the unit tests, I expect to mock it somehow. I can't use CELERY_ALWAYS_EAGER because that seems to bypass the broker altogether, preventing me to use AsyncResult to get the task by its id (as stated here).

Going through celery and kombu docs, I've found that there is an in-memory transport for unit tests, that would do what I'm looking for. I tried overriding the BROKER_URL setting to use it on the tests:

@override_settings(BROKER_URL='memory://')
def test_async_job():

But the behavior is the same as with the ampq broker: it blocks the test waiting for the result. Any Idea how am I supposed to configure this broker to get it working in the tests?

Community
  • 1
  • 1
Facundo Olano
  • 2,492
  • 2
  • 26
  • 32
  • 3
    You still need a worker even with the in-memory broker. Unfortunately, I don't think what you want to do is possible. You either need to start a worker to use with your tests, or run tasks synchronously using CELERY_ALWAYS_EAGER (in which case, as you discovered, you don't get an AsyncResult). – jrothenbuhler Mar 06 '14 at 22:20
  • Why do you need to access the task by its ID? This looks like a good candidate for unit testing. Why not test the function that spawns the task directly instead of doing it over HTTP? This way you get the `EagerResult` that has the same API as `AsyncResult`. – patrys Aug 27 '14 at 10:22
  • possible duplicate of [Unit testing with django-celery?](http://stackoverflow.com/questions/4055860/unit-testing-with-django-celery) – Guillaume Vincent Oct 03 '14 at 06:35
  • If you're using Python 2 you can use django-supervisor to start Celery for your unit tests (and auto-restart when code is changed). Unfortunately not compatible with Python 3. (Realize this isn't what you're asking but in case it is useful to anyone.) – Chris Nov 25 '15 at 17:53

4 Answers4

22

You can specify the broker_backend in your settings :

if 'test' in sys.argv[1:]:
    BROKER_BACKEND = 'memory'
    CELERY_TASK_ALWAYS_EAGER = True
    CELERY_TASK_EAGER_PROPAGATES = True

or you can override the settings with a decorator directly in your test

import unittest
from django.test.utils import override_settings


class MyTestCase(unittest.TestCase):

    @override_settings(CELERY_TASK_EAGER_PROPAGATES=True,
                       CELERY_TASK_ALWAYS_EAGER=True,
                       BROKER_BACKEND='memory')
    def test_mytask(self):
        ...
Guillaume Vincent
  • 13,355
  • 13
  • 76
  • 103
  • two settings names are different now (v4.2): CELERY_TASK_ALWAYS_EAGER, CELERY_TASK_EAGER_PROPAGATES -- great answer otherwise – C S Aug 29 '18 at 23:27
  • 2
    Is Celery picking up options outside the _namespace_ to pick up `BROKER_BACKEND`? What should `CELERY_BROKER_URL` be then? – Pablo Fernandez Nov 18 '18 at 11:50
  • 3
    `BROKER_BACKEND` apparently is only for Celery 3.x (https://github.com/celery/celery/blob/v3.1.26/celery/app/utils.py#L72). Celery 4.x/5.x is using `CELERY_BROKER_URL = "memory://"` (https://github.com/celery/celery/blob/5.0/celery/contrib/testing/app.py#L17) – Ranel Padon Dec 31 '21 at 23:09
  • 1
    I needed to change my result backend to sqalchemy: `CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite` – Helge Schneider May 11 '22 at 12:59
  • OP said they didn't want to enable CELERY_ALWAYS_EAGER – Le Frite Mar 22 '23 at 08:18
18

You can use the Kombu in-memory broker to run unit tests, however to do so you need to spin-up a Celery worker using the same Celery app object as the Django server.

To use the in-memory broker, set BROKER_URL to memory://localhost/

Then, to spin up a small celery worker you can do the following:

app = <Django Celery App>

# Set the worker up to run in-place instead of using a pool
app.conf.CELERYD_CONCURRENCY = 1
app.conf.CELERYD_POOL = 'solo'

# Code to start the worker
def run_worker():
    app.worker_main()

# Create a thread and run the worker in it
import threading
t = threading.Thread(target=run_worker)
t.setDaemon(True)
t.start()

You need to make sure you use the same app as the Django celery app instance.

Note that starting the worker will print many things and modify logging settings.

Gal Hochberg
  • 181
  • 1
  • 2
  • Note that `app.worker_main()` picks the arguments from the command line, whatever that was to run the test/script, which is probably not what you want for running Celery. – Pablo Fernandez Nov 19 '18 at 10:39
13

Here's a more fully featured example of a Django TransactionTestCase that works with Celery 4.x.

import threading

from django.test import TransactionTestCase
from django.db import connections

from myproj.celery import app  # your Celery app


class CeleryTestCase(TransactionTestCase):
    """Test case with Celery support."""

    @classmethod
    def setUpClass(cls):
        super().setUpClass()
        app.control.purge()
        cls._worker = app.Worker(app=app, pool='solo', concurrency=1)
        connections.close_all()
        cls._thread = threading.Thread(target=cls._worker.start)
        cls._thread.daemon = True
        cls._thread.start()

    @classmethod
    def tearDownClass(cls):
        cls._worker.stop()
        super().tearDownClass()

Be aware this doesn't change your queue names to a testing queues, so if you're also running the app you'll want to do that too.

Danielle Madeley
  • 2,616
  • 1
  • 19
  • 26
  • Very good answer! It prevents us from using the `task_always_eager` option that fakes workers. For accessing the current celery app, it is in fact easier to do just `from celery import current_app`. – Raffi Jul 04 '17 at 12:11
  • Is there any reason to have chosen a `TransactionTestCase` rather than the normal testcase? – Jonathan Jul 20 '17 at 14:00
  • @danielle This sadly doesn't work for me: I get conn_errors = self.channel.connection.client.connection_errors AttributeError: 'NoneType' object has no attribute 'client' – Jonathan Jul 20 '17 at 14:25
  • @Jonathan yes, if you don't have real transactions, you won't be able to share data between threads (i.e. can't actually commit). – Danielle Madeley Jul 22 '17 at 03:30
1

To use a memory broker for all celery tests in django, this pytest fixture can be used.

test.py

import pytest
from django.conf import settings
from django.test import TestCase


class TestStartFeatureDetectionView(TestCase):
    
    @pytest.fixture(autouse=True)
    def set_celery_broker_to_memory(self):
        settings.BROKER_BACKEND = 'memory://'
        settings.CELERY_BROKER_URL = 'memory://'
        settings.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'

A SQLite database from SQLAlchemy is used to store the results.

Helge Schneider
  • 483
  • 5
  • 8