2

There are some discrepancies on the way the Celery canvas works in async and eager mode. I've noticed that a group followed by a chain in a dynamic task that replaces itself does not send the results along to the next on the chain.

Well, that seems complicated, let me show an example:

Given the following task:

@shared_task(bind=True)
def grouped(self, val):
    task = (
        group(asum.s(val, n) for n in range(val)) | asum.s(val)
    )
    raise self.replace(task)

when it's grouped in another canvas like this:

@shared_task(bind=True)
def flow(self, val):
    workflow = (asum.s(1, val) |
                asum.s(2) |
                grouped.s() |
                amul.s(3))

    return self.replace(workflow)

the task amul will not receive the results from grouped when in eager mode.

To really ilustrate the issue, I've created a sample project on github where you can dive in into problem and help-me out with some quick solutions and possibly, some PR's on the celery project.

https://github.com/gutomaia/celery_equation

---- edited ----

On the project, I state the different behavior in both ways of using celery. In async mode, thouse tasks works as expected.

>>> from equation.main import *
>>> from equation.tasks import *
>>> flow.delay(1).get()
78
>>> flow.delay(2).get()
120
>>> flow.delay(100).get()
47895
gutomaia
  • 91
  • 1
  • 7

3 Answers3

2

I was struggling with this situation in a test case. For future readers, at least as of celery 4.4.0, the following idiom will work in all contexts, including synchronous, in-process execution:

    return self.replace(...)

Using raise or simply letting the function end right after Task.replace will only work in asynchronous mode. The relevant code is right at the end of Task.replace:

        if self.request.is_eager:
            return sig.apply().get()
        else:
            sig.delay()
            raise Ignore('Replaced by new task')
1

Sadly, eager mode will never be the same as running an actual worker. There's too many intricacies while running an actual worker for eager mode to be the exact same thing. I agree that things like this should fall into special cases when using eager mode but some discrepancy is expected. Please submit a PR if you know how to fix this issue and we can review the fix there. Thank you!

xirdneh
  • 256
  • 2
  • 3
0

grouped() is not returning anything, so how do you expect amul to get the result??

DejanLekic
  • 18,787
  • 4
  • 46
  • 77
  • The sugar syntax self.replace, can be used in both contexts. It should not matter if the replace is called as a return value or a raise. On the project in the attached link, you can see that in async mode the tasks does works. The question states the difference between the behavior of the eager and async mode. What I would expect, would be a small monkey patch fix that solves this particular case. – gutomaia Jul 12 '19 at 14:03