I know this will be seen as a duplicate, but I have looked around before asking this question, however all of the questions seem to be either outdated or don't help at all with my problem. This is where I've looked before writing this question:
- Official Docs
- How do you unit test a Celery task? (5 years old, all dead links)
- How to unit test code that runs celery tasks? (2 years old)
- How do I capture Celery tasks during unit testing? (3 years old)
I'm currently working on a project that heavily uses Celery to handle asynchronous tasks; to make the entire code-base stable I'm writing unit tests for the entire project however I haven't been able to write a single working test for Celery so far.
Most of my code needs to keep track of the tasks that were run in order to determine wether or not all results are ready to be queried. This is implemented in my code as follows:
@app.task(bind=True)
def some_task(self, record_id):
associate(self.request.id, record_id) # Not the actual DB code, but you get the idea
# Somewhere else in my code, eg: Flask endpoint
record = some_db_record()
some_task.apply_async(args=[record.id])
Since I don't have a *nix based machine to run my code on, I tried solving this by setting the always eager option to true, however this causes issues whenever any sub-task tries to query the result:
@app.task(bind=True)
def foo(self):
task = bar.apply_async()
foo_poll.apply_async(args=[task.id])
@app.task(bind=True, max_retries=None):
def foo_poll(self, celery_id)
task = AsyncResult(celery_id)
if not task.ready(): # RuntimeError: Cannot retrieve result with task_always_eager enabled
return self.retry(countdown=5)
else:
pass # Do something with the result
@app.task
def bar():
time.sleep(10)
I tried fixing this by patching the AsyncResult
methods, however this caused issues as self.request.id
would be None
:
with patch.object(AsyncResult, "_get_task_meta", side_effect=lambda: {"status": SUCCESS, "result": None}) as method:
foo()
@app.task(bind=True)
def foo(self):
pass # self.request.id is now None, which I need to track sub-tasks
Does anyone know how I could do this? Or if Celery is even worth using anymore? I'm at the point where I find the documentation and any questions related to testing so overwhelmingly complex I just feel like ditching it all together and just go back to multithreading.