I am unit testing celery tasks. I have chain tasks that also have groups, so a chord is resulted.
The test should look like:
- run celery task ( delay )
- wait for task and all subtasks
- assert
I tried the following:
def wait_for_result(result):
result.get()
for child in result.children or list():
if isinstance(child, GroupResult):
# tried looping over task result in group
# until tasks are ready, but without success
pass
wait_for_result(child)
This creates a deadlock, chord_unlock being retried forever. I am not interested in task results. How can I wait for all the subtasks to finish?