0

I have multiple tasks in celery. I run them using a mix and match of chains, groups and chords. However, I still don't get how can I access all of the tasks (in order of execution) that were executed in the canvas.

I need to access all of the tasks, since I need the result and arguments of each task.

I tried to solve my problem by creating a separate Celery project, with simpler tasks.

Below are all of my simpler tasks:

celery_app = Celery('celery_test', broker=rabbit_url, backend=redis_url)

celery_app.conf.update(
    result_extended=True
)

celery_app.conf.result_backend_transport_options = {
    'result_chord_ordered': True 
}

@celery_app.task(bind=True)
def add(self, x, y):
    print(f"Add {x}, {y} --- {self.request.id}")
    return x + y


@celery_app.task(bind=True)
def subtract(self, x, y):
    return x - y


@celery_app.task(bind=True)
def multiply(self, x, y):
    print(f"Multiply {x}, {y} --- {self.request.id}")
    return x * y


@celery_app.task(bind=True)
def divide(self, x, y):
    return x / y


@celery_app.task(bind=True)
def xsum(self, numbers):
    print(f"Xsum {numbers} --- {self.request.id}")
    return sum(numbers)

And below I have some examples of playing with the Celery Canvas.

num1 = 3
num2 = 4
num3 = 7

pipeline = (
    group(
        add.s(num1, num2),
        add.s(num1, num3)
    ) |
    xsum.s() | multiply.s(num3)
)()

pipeline2 = chain(
    add.s(2, 2), 
    add.s(4), 
    add.s(8)
)()

print(pipeline)
result = pipeline.get()
print(result)
parent1 = pipeline.parent
print(parent1)
print(type(parent1))

for res in parent1.results:
    print(f"Id = {res.id} --- Args = {res.args} ---- Result = {res.get()}")

print(pipeline.parent.parent)
print()
print(f"Res: {pipeline2.parent.parent.get()} -- Args = {pipeline2.parent.parent.args}")
print(f"Res: {pipeline2.parent.get()} -- Args = {pipeline2.parent.args}")
print(f"Res: {pipeline2.get()} -- Args = {pipeline2.args}")

Now, here is the output of the above code:

03e6481d-f2c9-4218-a834-733a7a17c49a
119
41fd4790-07db-47b9-a8bf-e4cb31f499ad
<class 'celery.result.GroupResult'>
Id = 667eb4c9-48b6-45d9-8704-e101cabf2899 --- Args = [3, 4] ---- Result = 7
Id = 31dbaeb8-5bfe-40bd-ae11-80b9c9607350 --- Args = [3, 7] ---- Result = 10
None

Res: 4 -- Args = [2, 2]
Res: 8 -- Args = [4, 4]
Res: 16 -- Args = [8, 8]

The first two results 03e6481d-f2c9-4218-a834-733a7a17c49a and 119 make sense, since that is the id and the result of the multiply task.

The following two results (41fd4790-07db-47b9-a8bf-e4cb31f499ad and <class 'celery.result.GroupResult'>) do not make sense to me. Why is the Group the parent of the multiply task? Why is xsum not the parent?

The following 3 prints make sense, since the Group does not have a parent, and those are the results of the group.

The final 3 prints however, do make sense. The parent-child relationship between the tasks do make sense, and everything is printed as it should.

I still don“t get why the celery canvas works that way, but letting that aside. Is there any way I can access all of the tasks (with their results and arguments) inside a celery workflow, in the order that they were executed?

Thanks a lot in advance

0 Answers0