I feel like this is quite a simple use case but not one that I have had success solving from the documentation.
Here is a contrived version of my problem:
I have a task that take a single parameter and returns a list. I want to apply another task to each element in the list. I then (depending on circumstance) may want to continue processing with other tasks.
chain(
my.tasks.divide.s(data),
my.tasks.conquer.map(),
).apply_async().get()
or
chain(
my.tasks.divide.s(data),
my.tasks.conquer.map(),
my.tasks.unite.s(),
my.tasks.rule.s(),
).apply_async().get()
Map doesn't work that way, but I feel like it should! All I want is to chain from a single param to a list and then back.. Surely this is not difficult to do properly?
The 'solutions' here and here seem over complicated and I can't believe are the best practice.
The documentation on chords, maps, and chains doesn't help with this particular case.
EDIT: What I have so far. The problem here is that I have to call .apply_async().get()
twice to get the actual result, as the intermediary step returns a chord not the actual result:
@app.task()
def divide(item):
return [x for x in item]
@app.task()
def conquer(item):
return item.upper()
@app.task()
def rule(items):
return "".join(items)
@app.task()
def conquer_group(items):
return group([conquer.s(x) for x in items])
@app.task()
def rule_group(items):
return chord([conquer.s(x) for x in items], rule.s())
def main():
print chain(
divide.s("foobarbaz"),
rule_group.s(),
).apply_async().get().apply_async().get()