I am trying to test whether a celery task has been started after a request to my django server. I have something like this:
# tasks.py
def add(x, y):
return x + y
# views.py
def home(request): # respond to request at root url
tasks.add.delay(1,2)
return HttpResponse('hello world')
# tests.py
class MyTest(TestCase):
def test_task_triggered(self):
self.client.get('/')
# XXXX HOW TO TEST THAT TASK HAS BEEN TRIGGERED?
How can I test whether or not the task has been started in my unit tests? Obviously, I don't have direct access to the task id otherwise something like this would work.
More generally, how can you detect celery tasks being triggered from across functions, classes, or modules?
Thanks for your help.