7

I know this will be seen as a duplicate, but I have looked around before asking this question, however all of the questions seem to be either outdated or don't help at all with my problem. This is where I've looked before writing this question:


I'm currently working on a project that heavily uses Celery to handle asynchronous tasks; to make the entire code-base stable I'm writing unit tests for the entire project however I haven't been able to write a single working test for Celery so far.

Most of my code needs to keep track of the tasks that were run in order to determine wether or not all results are ready to be queried. This is implemented in my code as follows:

@app.task(bind=True)
def some_task(self, record_id):
    associate(self.request.id, record_id)  # Not the actual DB code, but you get the idea

# Somewhere else in my code, eg: Flask endpoint
record = some_db_record()
some_task.apply_async(args=[record.id])

Since I don't have a *nix based machine to run my code on, I tried solving this by setting the always eager option to true, however this causes issues whenever any sub-task tries to query the result:

@app.task(bind=True)
def foo(self): 
    task = bar.apply_async()
    foo_poll.apply_async(args=[task.id]) 

@app.task(bind=True, max_retries=None):
def foo_poll(self, celery_id)
    task =  AsyncResult(celery_id)
    if not task.ready():  # RuntimeError: Cannot retrieve result with task_always_eager enabled
        return self.retry(countdown=5)
    else:
        pass  # Do something with the result

@app.task
def bar():
    time.sleep(10)

I tried fixing this by patching the AsyncResult methods, however this caused issues as self.request.id would be None:

with patch.object(AsyncResult, "_get_task_meta", side_effect=lambda: {"status": SUCCESS, "result": None}) as method:
    foo()

@app.task(bind=True)
def foo(self):
    pass   # self.request.id is now None, which I need to track sub-tasks

Does anyone know how I could do this? Or if Celery is even worth using anymore? I'm at the point where I find the documentation and any questions related to testing so overwhelmingly complex I just feel like ditching it all together and just go back to multithreading.

Paradoxis
  • 4,471
  • 7
  • 32
  • 66

3 Answers3

4

It is possible to test the function without the celery task binding by calling it directly and by using a mock to replace the task object.

The inner function is hidden behind some_task.__wrapped__.__func__.

Here is an example of how to use it in a test case:

def test_some_task(self):
    mock_task = Mock()
    mock_task.request.id = 5  # your test data here
    record_id = 5  # more test data
    some_task_inner = some_task.__wrapped__.__func__
    some_task_inner(mock_task, record_id)
    # ...

Erik Kalkoken
  • 30,467
  • 8
  • 79
  • 114
  • 2
    I actually test my tests like this these days, just forgot I had posted this question in the first place :p – Paradoxis May 11 '20 at 11:27
3

I had about the same issue and came up with two possible approaches:

  1. Call tasks in tests directly and wrap all inner celery interactions with if self.request.called_directly and run task directly if True or with apply_async if False.
  2. Wrap task.ready() and other statuses check with functions where I check for ALWAYS_EAGER and task readiness.

Eventually I came up with kinda mix of both with the rule to avoid nested tasks as much as I can. And also put as little code in @app.task as I can in order to be able to test task functions in as much isolation as possible.

It might look quite frustrating and awful, but in fact it's not.

Also you can check how big guys like Sentry do this (spoiler: mocks and some nifty helpers).

So it's definitely possible, it's just not an easy way to find some best practices around.

valignatev
  • 6,020
  • 8
  • 37
  • 61
  • The problem with `self.request.called_directly` is that celery code is used throughout my project, not just in the task itself (eg: `AsyncResult` is queried as part of one of my models, it has a method `is_ready` that checks if all data has been inserted, in which it uses `AsyncResult` to determine if all tasks inserted their data) meaning I don't always have access to it – Paradoxis Jul 11 '17 at 09:29
  • Yeah, that's why there is a second way which you can use outside of the task. Is it something wrong with it too? – valignatev Jul 11 '17 at 10:18
0

I haven't used celery for some time, and unless things have changed you should be able to call your methods directly to exercise them as unit tests.

@app.task(bind=True, max_retries=None):
def foo_poll(self, celery_id)
    task =  AsyncResult(celery_id)
    if not task.ready():  # RuntimeError: Cannot retrieve result with task_always_eager enabled
        return self.retry(countdown=5)
    else:
        pass  # Do something with the result

For your unit test you could:

  • patch AsyncResult, to trigger the correct branch
  • Instantiate your class
  • Patch retry method and assert it's called
  • Exercise your method directly

This, of course, only exercises your method's logic, and not celery. I usually put one or two integration (collaboration) test that specify ALWAYS_EAGER so that it goes through the celery code, even though the celery will be executed in memory without a queue.

dm03514
  • 54,664
  • 18
  • 108
  • 145
  • I've tried calling my tests directly and patching a ton of stuff, but the `app.task` decorator is so overly complex I haven't been able to find a solution to it so far – Paradoxis Jul 11 '17 at 09:27
  • Could you post a minimum working example of the celery code you're trying to test, the full class definition, and your complete unit test? – dm03514 Jul 11 '17 at 10:33