7

I have a celery application

@task
def step_one(items):
    items.sort()
    for i in range(len(items), 100):
        yield items[i:i + 100]

@task
def step_two(batch):
    for item in batch:
        print(item)

if __name__ == '__main__':
    celery.chain(step_one.s([i for i in range 100000]), step_two.map().s()).delay()

This does not work because step_two is expecting an iterator, but I want it to take the results from the previous step, what is the correct way to go about this? It would also be nice if tasks could start at the yield rather than after the whole list is sorted and chunked.

I have seen this question Best way to map a generated list to a task in celery but I feel that it should not be necessary to create an intermediate step to map the results to another task. Additionally, calling step_two.map.s() gives me an error function object has no attribute s

shane
  • 1,742
  • 2
  • 19
  • 36

0 Answers0