31

When I have something like the following

group1 = group(task1.si(), task1.si(), task1.si())
group2 = group(task2.si(), task2.si(), task2.si())

workflow = chain(group1, group2, task3.si())

The intuitive interpretation is that task3 should only execute after all tasks in group 2 have finished.

In reality, task 3 executes while group1 has started but hasn't completed yet.

What am i doing wrong?

w--
  • 6,427
  • 12
  • 54
  • 92

2 Answers2

27

So as it turns out, in celery you cannot chain two groups together.
I suspect this is because groups chained with tasks automatically become a chord
--> Celery docs: http://docs.celeryproject.org/en/latest/userguide/canvas.html

Chaining a group together with another task will automatically upgrade it to be a chord:

Groups return a parent task. When chaining two groups together, I suspect that when the first group completes, the chord starts the callback "task". I suspect this "task" is actually the "parent task" of the second group. I further suspect that this parent task completes as soon as it finishes kicking off all the subtasks within the group and as a result the next item after the 2nd group is executed.

To demonstrate this here is some sample code. You'll need to already have a running celery instance.

# celery_experiment.py

from celery import task, group, chain, chord
from celery.signals import task_sent, task_postrun, task_prerun

import time
import logging

import random
random.seed()

logging.basicConfig(level=logging.DEBUG)

### HANDLERS ###    
@task_prerun.connect()
def task_starting_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):    
    try:
        logging.info('[%s] starting' % kwargs['id'])
    except KeyError:
        pass

@task_postrun.connect()
def task_finished_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds):
    try:    
        logging.info('[%s] finished' % kwargs['id'])
    except KeyError:
        pass


def random_sleep(id):
    slp = random.randint(1, 3)
    logging.info('[%s] sleep for %ssecs' % (id, slp))
    time.sleep(slp)

@task()
def thing(id):
    logging.info('[%s] begin' % id)
    random_sleep(id)
    logging.info('[%s] end' % id)


def exec_exp():
    st = thing.si(id='st')
    st_arr = [thing.si(id='st_arr1_a'), thing.si(id='st_arr1_b'), thing.si(id='st_arr1_c'),]
    st_arr2 = [thing.si(id='st_arr2_a'), thing.si(id='st_arr2_b'),]
    st2 = thing.si(id='st2')
    st3 = thing.si(id='st3')
    st4 = thing.si(id='st4')

    grp1 = group(st_arr)
    grp2 = group(st_arr2)

    # chn can chain two groups together because they are seperated by a single subtask
    chn = (st | grp1 | st2 | grp2 | st3 | st4)

    # in chn2 you can't chain two groups together. what will happen is st3 will start before grp2 finishes
    #chn2 = (st | st2 | grp1 | grp2 | st3 |  st4)

    r = chn()
    #r2 = chn2()
w--
  • 6,427
  • 12
  • 54
  • 92
  • Thanks for this. Unfortunately for me, my workflow wouldn't allow me to use a 'relevant' task in between the groups. So I ended up creating a fake task `def fake_celery_task(): pass` to run in between the groups... – lukik Nov 03 '14 at 09:15
  • in this scenario when a group takes 2 tasks and returns the results to the chord i am assuming the chord gets the results in an arbitrary order – PirateApp May 03 '18 at 04:51
22

I have the same issue with celery, trying to have a workflow where the first step is "spawn a million tasks". Tried groups of groups, subtasks, eventually my step2 kicks off before step1 is over.

Long story short I might have found a solution with the use of chords and a dumb finisher:

@celery.task
def chordfinisher( *args, **kwargs ):
  return "OK"

Doing nothing much, but it enables me to do this:

tasks = []
for id in ids:
    tasks.append( mytask.si( id ) )
step1 = chord( group( tasks ), chordfinisher.si() )

step2 = ...

workflow = chain( step1, step2 )

Originally I wanted to have step1 in a subtask but for the same reason as suspected, the action of calling a group ends, the task is considered finished, and my workflow moves on...

If someone has something better, I'm interested!

Jonathan Adami
  • 336
  • 2
  • 9
  • 3
    Hi,this is pretty much what i ended up doing. One thing to keep in mind, you need the dumbfinisher to return the result of the group execution. otherwise if anything in the group fails, your chain will not halt at step1. (this may or may not be what you want) – w-- Oct 08 '13 at 16:19