Consider this Celery workflow:
wf = collect_items.s() | add_details.s() | publish_items.s()
It collects some items, adds extra details to each one in parallel, then publishes the decorated information somewhere.
What I want is for add_details
to behave as a group of tasks, one per item, that fetch the details for each item in parallel. Obviously the group has to be generated from the data output by collect_items
.
Here's what I tried, using the default rabbitmq broker:
app = Celery(backend="rpc://")
@app.task
def collect_items(n):
return range(n)
@app.task
def add_details(items):
return group(get_details.s(i) for i in items).delay()
@app.task
def get_details(item):
return (item, item * item)
@app.task
def publish_items(items):
print("items = %r" % items)
I want the output to be numbers 0-9 decorated with their squares, all calculated concurrently:
>>> wf.delay(10).get()
items = [(0, 0), (1, 1), (2, 4), ... (8, 64), (9, 81)]
This does invoke the expected tasks, but unfortunately passes the results to publish_items
as a bunch of GroupResults
containing AsyncResults
with PENDING status, even though the tasks appear to have completed.
I can't wait for those results in publish_items
because you can't use get()
in a task (risk of deadlocks etc.). I thought Celery would recognise when a task like add_details
returns a GroupResult and do a get
on it before returning that value to be passed on to the next task in the chain.
This seems like a common pattern, is there anyway to do it in Celery?
I've seen similar questions here but the answers seem to assume a lot of intimate knowledge of how Celery works under the covers, and they don't work for me anyway.