19

When a Django test case runs, it creates an isolated test database so that database writes get rolled back when each test completes. I am trying to create an integration test with Celery, but I can't figure out how to connect Celery to this ephemeral test database. In the naive setup, Objects saved in Django are invisible to Celery and objects saved in Celery persist indefinitely.

Here is an example test case:

import json
from rest_framework.test import APITestCase
from myapp.models import MyModel
from myapp.util import get_result_from_response

class MyTestCase(APITestCase):
    @classmethod
    def setUpTestData(cls):
        # This object is not visible to Celery
        MyModel(id='test_object').save()

    def test_celery_integration(self):
        # This view spawns a Celery task
        # Task should see MyModel.objects.get(id='test_object'), but can't
        http_response = self.client.post('/', 'test_data', format='json')

        result = get_result_from_response(http_response)
        result.get()  # Wait for task to finish before ending test case
        # Objects saved by Celery task should be deleted, but persist

I have two questions:

  1. How do make it so that Celery can see the objects that the Django test case?

  2. How do I ensure that all objects saved by Celery are automatically rolled back once the test completes?

I am willing to manually clean up the objects if doing this automatically is not possible, but a deletion of objects in tearDown even in APISimpleTestCase seems to be rolled back.

drhagen
  • 8,331
  • 8
  • 53
  • 82

3 Answers3

34

This is possible by starting a Celery worker within the Django test case.

Background

Django's in-memory database is sqlite3. As it says on the description page for Sqlite in-memory databases, "[A]ll database connections sharing the in-memory database need to be in the same process." This means that, as long as Django uses an in-memory test database and Celery is started in a separate process, it is fundamentally impossible to have Celery and Django to share a test database.

However, with celery.contrib.testing.worker.start_worker, it possible to start a Celery worker in a separate thread within the same process. This worker can access the in-memory database.

This assumes that Celery is already setup in the usual way with the Django project.

Solution

Because Django-Celery involves some cross-thread communication, only test cases that don't run in isolated transactions will work. The test case must inherit directly from SimpleTestCase or its Rest equivalent APISimpleTestCase and set databases to '__all__' or just the database that the test interacts with.

The key is to start a Celery worker in the setUpClass method of the TestCase and close it in the tearDownClass method. The key function is celery.contrib.testing.worker.start_worker, which requires an instance of the current Celery app, presumably obtained from mysite.celery.app and returns a Python ContextManager, which has __enter__ and __exit__ methods, which must be called in setUpClass and tearDownClass, respectively. There is probably a way to avoid manually entering and existing the ContextManager with a decorator or something, but I couldn't figure it out. Here is an example tests.py file:

from celery.contrib.testing.worker import start_worker
from django.test import SimpleTestCase

from mysite.celery import app

class BatchSimulationTestCase(SimpleTestCase):
    databases = '__all__'

    @classmethod
    def setUpClass(cls):
        super().setUpClass()

        # Start up celery worker
        cls.celery_worker = start_worker(app, perform_ping_check=False)
        cls.celery_worker.__enter__()

    @classmethod
    def tearDownClass(cls):
        super().tearDownClass()

        # Close worker
        cls.celery_worker.__exit__(None, None, None)

    def test_my_function(self):
        # my_task.delay() or something

For whatever reason, the testing worker tries to use a task called 'celery.ping', probably to provide better error messages in the case of worker failure. The task it is looking for is celery.contrib.testing.tasks.ping, which is not available at test time. Setting the perform_ping_check argument of start_worker to False skips the check for this and avoids the associated error.

Now, when the tests are run, there is no need to start a separate Celery process. A Celery worker will be started in the Django test process as a separate thread. This worker can see any in-memory databases, including the default in-memory test database. To control the number of workers, there are options available in start_worker, but it appears the default is a single worker.

drhagen
  • 8,331
  • 8
  • 53
  • 82
  • Hi, this approach definitely seems interesting -- but it seems that I'm not able to communicate with the celery worker when I use this method. I'll poke around with it if I get any spare time. – Marviel Nov 07 '17 at 12:53
  • 9
    A workaround for the ping task issue is adding `app.loader.import_module('celery.contrib.testing.tasks')` before `cls.celery_worker = start_worker(app)` or by adding `'celery.contrib.testing.tasks'` to `INSTALLED_APPS` (note the `.tasks` at the end). – Tirzono Aug 25 '18 at 20:13
  • How to read/access the data that is created after the celery worker is spawned? It seems to work with the data created in the `setUpClass` function but I get empty data for anything created afterward. – Afsan Abdulali Gujarati Jun 16 '21 at 23:04
  • 3
    I get an error like `ERROR/MainProcess] Signal handler > raised: InterfaceError('connection already closed')` . Any idea on how to solve it? – Eduardo Gomes Mar 11 '22 at 01:37
  • 1
    @EduardoGomes im having the same error. Did you manage to fix it? – Diogo Silva Feb 01 '23 at 21:46
  • 1
    Same issue here with "connection already closed", I could not find a solution – Marcio Cruz Apr 27 '23 at 18:03
3

For your unittests I would recommend skipping the celery dependency, the two following links will provide you with the necesarry infos to start your unittests:

If you really want to test the celery function calls including a queue I'd propably set up a dockercompose with the server, worker, queue combination and extend the custom CeleryTestRunner from the django-celery docs. But I wouldn't see a benefit from it because the test system is pbly to far away from production to be representative.

  • I've read both those links. But neither shows saving/reading anything to/from a database, which is kind of important in Django. – drhagen Oct 02 '17 at 18:50
  • 1
    They actually tell you to create a new Testrunner and run your tests with this testrunner resulting in the tests skipping the async part of celery. With this you should be able to treat your tests as if they are syncronouse and result in both parts of the (former) celery tasks hitting the same testdb. – Johannes Reichard Oct 02 '17 at 19:13
  • 1
    Are you talking about `CELERY_ALWAYS_EAGER`? I tried that. When I set that to True, Celery still used the production database; it just waited for the tasks to finish before `delay` returned. – drhagen Oct 02 '17 at 19:18
  • 1
    hmm, very strang. Did you ran your tests with the new testrunner, I forgot this in the beginning and it took me a while to recognise my mistake ^^ – Johannes Reichard Oct 02 '17 at 19:21
  • 1
    We are using a Testrunner inheriting from `CeleryTestSuiteRunner`. But the only custom thing we are doing is doing some custom fixture loading. – Johannes Reichard Oct 02 '17 at 19:27
  • I just get `No module named 'celery.contrib.test_runner'`. As far as I can tell, it was removed from Celery at some point. – drhagen Oct 02 '17 at 19:37
  • 1
    Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/155785/discussion-between-johannes-reichard-and-drhagen). – Johannes Reichard Oct 02 '17 at 19:38
  • 1
    I just figured out we are using an slightly outdated version of django-celery, what version are you using? – Johannes Reichard Oct 02 '17 at 19:43
0

I found another workaround for the solution based on @drhagen's one:

Call celery.contrib.testing.app.TestApp() before calling start_worker(app)

from celery.contrib.testing.worker import start_worker
from celery.contrib.testing.app import TestApp

from myapp.tasks import app, my_task


class TestTasks:
    def setup(self):
        TestApp()
        self.celery_worker = start_worker(app)
        self.celery_worker.__enter__()

    def teardown(self):
        self.celery_worker.__exit__(None, None, None)
  • I am having a similar issue with `django-channles` any help please? [my question](https://stackoverflow.com/questions/68190400/django-concurrency-cant-create-data-for-testing-in-async-function) – Ali Husham Jul 14 '21 at 13:10