0

I am trying to implement some celery chains/groups/chords using django 3.0, celery 4.3, redis and python 3.6. From the documentation, I thought tasks in a group run in parallel, and tasks in a chain run sequentially, but I am not observing that behavior.

I have this chain of task signatures:

transaction.on_commit(lambda: chain(clean, group(chain(faces_1, faces_2), folder, hashes), change_state_task.si(document_id, 'ready')).delay()) 

where I expect the change_state_task to wait for all the other tasks to complete before it starts. This did not work, in that the change_state_task started before hashes finished. All the tasks ran and completed successfully.

I then tried this chain:

transaction.on_commit(lambda: chain(clean, faces_1, faces_2, folder, hashes, change_state_task.si(document_id, 'ready')).delay()) 

where all the signatures are in a long chain. However, the change_state_task is still starting before the hashes task finishes.

I even tried using change_state_task.s(document_id, 'ready') (replaced si with s), thinking that the change_state_task could not start without the output of the hashes task. But it still starts before hashes ends.

I then tried using task.s versus task.si for all the task signatures, and the change_state_task still started before the hashes task ended.

What am I missing?

Thanks!

Mark

PS Apologies for not being clear on my task signatures. I have a long Python method that figures out what tasks have to be run. It looks something like this:

@app.task(bind=True)
def noop(self, message):
    # Task accepts a string and does nothing
    logger.debug(message)
    return True    

def figure_out_which_tasks_to_fire(document_id):
    clean = noop.si("replaces clean_document_image task")
    faces_1 = noop.si("replaces find_faces_task task")
    faces_2 = noop.si("replaces recognize_face_task task")
    folder = noop.si("replaces update_source_folder task")
    hashes = noop.si("replaces compute_image_descriptor_task task")
    if clean_needed:
        clean = clean_document_image.s(document_id, key, value)
    if faces_needed:
        faces_1 = find_faces_task.s(document_id)
        faces_2 = recognize_face_task.s(document_id)
    if folder_needed:
        folder = update_source_folder.s(document_id, file_name, source_folder)
    if hashes_needed:
        hashes = compute_image_descriptor_task.s(settings.DEFAULT_SIMILAR_IMAGE, document_id, hash_name) 
    # Finished figuring out what needs to be done, so do the tasks 
    # and then update the state of the document.
    transaction.on_commit(lambda: chain(clean, faces_1, faces_2, folder, hashes, change_state_task.s(document_id, 'ready')).delay()) 

I need the transaction.on_commit because all the tasks read and write the Django app's backend mysql database.

user1045680
  • 815
  • 2
  • 9
  • 19
  • As far as I know, you need to use `.s` or `.si` when constructing a signature. Have you tried `clean.s` or `clean.si()` for clean, faces_1, faces_2, folder and hashes? – schillingt Apr 22 '21 at 00:15
  • Thanks for your comment. I added some more explanation to my original post to answer your question. `clean`, `faces_1`, etc refer to normal celery task signatures. I initialize them with `noop` task signatures, then replace the `noop` task signature with the real task signature if that task should be executed based on some logic. All of the tasks are running as designed, I just have an issue with the last task in the chain starting before the second to the last task in the chain is done. – user1045680 Apr 22 '21 at 00:54

1 Answers1

0

I've had issues with celery automatically transforming chained groups into chords. Try using the chord() function specifically.

schillingt
  • 13,493
  • 2
  • 32
  • 34
  • Thanks for pointing me in the right direction with chords. This post really helped - https://stackoverflow.com/questions/15123772/celery-chaining-groups-and-subtasks-out-of-order-execution – user1045680 Apr 27 '21 at 21:27