3

I feel like this is quite a simple use case but not one that I have had success solving from the documentation.

Here is a contrived version of my problem:

I have a task that take a single parameter and returns a list. I want to apply another task to each element in the list. I then (depending on circumstance) may want to continue processing with other tasks.

chain(
    my.tasks.divide.s(data),
    my.tasks.conquer.map(),
    ).apply_async().get()

or

chain(
    my.tasks.divide.s(data),
    my.tasks.conquer.map(),
    my.tasks.unite.s(),
    my.tasks.rule.s(),
    ).apply_async().get()

Map doesn't work that way, but I feel like it should! All I want is to chain from a single param to a list and then back.. Surely this is not difficult to do properly?

The 'solutions' here and here seem over complicated and I can't believe are the best practice.

The documentation on chords, maps, and chains doesn't help with this particular case.

EDIT: What I have so far. The problem here is that I have to call .apply_async().get() twice to get the actual result, as the intermediary step returns a chord not the actual result:

@app.task()
def divide(item):
    return [x for x in item]

@app.task()
def conquer(item):
    return item.upper()

@app.task()
def rule(items):
    return "".join(items)

@app.task()
def conquer_group(items):
    return group([conquer.s(x) for x in items])

@app.task()
def rule_group(items):
    return chord([conquer.s(x) for x in items], rule.s())

def main():
    print chain(
        divide.s("foobarbaz"),
        rule_group.s(),
        ).apply_async().get().apply_async().get()
jsj
  • 9,019
  • 17
  • 58
  • 103
  • Have you tried to [composite your functions](https://stackoverflow.com/questions/16739290/composing-functions-in-python)? – Maor Refaeli Oct 14 '18 at 15:07

1 Answers1

1

The issue here is that map() needs an iterable to be instantiated. That's why one can do (sample.map(range(100)) | another_task.s()).apply_async() but not instantiate a map() signature that accepts the result of the last task in the chain. I think the simples way to do this is to instantiate the map() signature within a task.

@app.task()
def divide(item):
    return [x for x in item]

@app.task()
def conquer(item):
    return item.upper()

@app.task()
def process(items):
    return conquer.map(items)

def run():
    (divide.s("foobar") | process.s()).apply_async()
xirdneh
  • 256
  • 2
  • 3