5

I have a working chain, but when I add apply_async() it only executes the first task.

@task(name='run_a', delay=True)
def run_a(**kwargs):
    do_whatever(kwarg['var'])
    return

@task(name='run_b', delay=True)
def run_b(**kwargs):
    # ...
    return

@task(name='run_c', delay=True)
def run_c(**kwargs):
    # ...
    return

With a chain command:

ret = chain(
    run_a.s(**kwargs),
    run_b.s(**kwargs),
    run_b.s(**kwargs)
).apply_async()
  • Without the apply_async it all works (synchronously) as expected.
  • 'kwargs' is a dict.
GerardJP
  • 985
  • 2
  • 9
  • 20
  • Can somebody confirm it's related to using the kwargs dict? http://stackoverflow.com/questions/14968265/celery-task-chain-and-accessing-kwargs – GerardJP Apr 09 '16 at 09:07
  • When getting to `run_b` I get a typeError in the Celery Worker log: `TypeError: run_b() takes exactly 0 arguments (8 given)` – GerardJP Apr 09 '16 at 11:33

2 Answers2

2

Base on the document at http://docs.celeryproject.org/en/master/userguide/canvas.html#chains : The linked task will be applied with the result of its parent task as the first argument. . So to force the next linked task not to use parent result as argument, we have to make the task immutable by using the .si() shortcut. So we have to re-write the chain as follow

In [29]: ret = chain(
    ...:     run_a.si(**kwargs),
    ...:     run_b.si(**kwargs),
    ...:     run_c.si(**kwargs)
    ...: ).apply_async()

Result

In [30]: print ret.parent.parent.graph
0e1541f8-93c2-48c9-95b0-7a0a5971d74a(1)
     7b5e11e4-6ccf-49cc-a1dd-42bf407a37de(0)
7b5e11e4-6ccf-49cc-a1dd-42bf407a37de(0)
70a6e66c-1ef9-4814-ae23-9c905ee1fcd5(2)
     0e1541f8-93c2-48c9-95b0-7a0a5971d74a(1)
          7b5e11e4-6ccf-49cc-a1dd-42bf407a37de(0)
Vu Gia Truong
  • 1,022
  • 6
  • 14
0

While Celery validates your tasks before execution, for a func to work *args and **kwargs are expected.

# Kwargs was filled, I added an empty args list
args = []
kwargs = {
    'some': 'intelligent data',
    }

When calling the functions with both, it works as expected:

ret = chain(
    run_a.s(*args, **kwargs),
    run_b.s(*args, **kwargs),
    run_b.s(*args, **kwargs)
).apply_async()
GerardJP
  • 985
  • 2
  • 9
  • 20