I am trying to implement some celery chains/groups/chords using django 3.0, celery 4.3, redis and python 3.6. From the documentation, I thought tasks in a group run in parallel, and tasks in a chain run sequentially, but I am not observing that behavior.
I have this chain of task signatures:
transaction.on_commit(lambda: chain(clean, group(chain(faces_1, faces_2), folder, hashes), change_state_task.si(document_id, 'ready')).delay())
where I expect the change_state_task
to wait for all the other tasks to complete before it starts. This did not work, in that the change_state_task
started before hashes
finished. All the tasks ran and completed successfully.
I then tried this chain:
transaction.on_commit(lambda: chain(clean, faces_1, faces_2, folder, hashes, change_state_task.si(document_id, 'ready')).delay())
where all the signatures are in a long chain. However, the change_state_task
is still starting before the hashes
task finishes.
I even tried using change_state_task.s(document_id, 'ready')
(replaced si with s), thinking that the change_state_task
could not start without the output of the hashes
task. But it still starts before hashes ends.
I then tried using task.s
versus task.si
for all the task signatures, and the change_state_task
still started before the hashes
task ended.
What am I missing?
Thanks!
Mark
PS Apologies for not being clear on my task signatures. I have a long Python method that figures out what tasks have to be run. It looks something like this:
@app.task(bind=True)
def noop(self, message):
# Task accepts a string and does nothing
logger.debug(message)
return True
def figure_out_which_tasks_to_fire(document_id):
clean = noop.si("replaces clean_document_image task")
faces_1 = noop.si("replaces find_faces_task task")
faces_2 = noop.si("replaces recognize_face_task task")
folder = noop.si("replaces update_source_folder task")
hashes = noop.si("replaces compute_image_descriptor_task task")
if clean_needed:
clean = clean_document_image.s(document_id, key, value)
if faces_needed:
faces_1 = find_faces_task.s(document_id)
faces_2 = recognize_face_task.s(document_id)
if folder_needed:
folder = update_source_folder.s(document_id, file_name, source_folder)
if hashes_needed:
hashes = compute_image_descriptor_task.s(settings.DEFAULT_SIMILAR_IMAGE, document_id, hash_name)
# Finished figuring out what needs to be done, so do the tasks
# and then update the state of the document.
transaction.on_commit(lambda: chain(clean, faces_1, faces_2, folder, hashes, change_state_task.s(document_id, 'ready')).delay())
I need the transaction.on_commit
because all the tasks read and write the Django app's backend mysql database.