There are some discrepancies on the way the Celery canvas works in async and eager mode. I've noticed that a group followed by a chain in a dynamic task that replaces itself does not send the results along to the next on the chain.
Well, that seems complicated, let me show an example:
Given the following task:
@shared_task(bind=True)
def grouped(self, val):
task = (
group(asum.s(val, n) for n in range(val)) | asum.s(val)
)
raise self.replace(task)
when it's grouped in another canvas like this:
@shared_task(bind=True)
def flow(self, val):
workflow = (asum.s(1, val) |
asum.s(2) |
grouped.s() |
amul.s(3))
return self.replace(workflow)
the task amul will not receive the results from grouped when in eager mode.
To really ilustrate the issue, I've created a sample project on github where you can dive in into problem and help-me out with some quick solutions and possibly, some PR's on the celery project.
https://github.com/gutomaia/celery_equation
---- edited ----
On the project, I state the different behavior in both ways of using celery. In async mode, thouse tasks works as expected.
>>> from equation.main import *
>>> from equation.tasks import *
>>> flow.delay(1).get()
78
>>> flow.delay(2).get()
120
>>> flow.delay(100).get()
47895