2

I want to design celery canvas where there might be chord/group in middle of the task and then process further.

               is_even(num1)
get_numbers -> is_even(num2) -> filter_even -> xsum
               is_even(num3)

Directory structure is like this

myprj
├── __init__.py
├── celery.py
├── celeryconfig.py
└── tasks.py

myprj.celery Module

from __future__ import absolute_import, unicode_literals

from celery import Celery

app = Celery('proj',
             broker='amqp://dev:dev@127.0.0.1/storage-collector-dev',
             backend='redis://127.0.0.1/0',
             include=['myprj.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

myprj.celeryconfig module

broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True

myprj.tasks module

from __future__ import absolute_import, unicode_literals

from .celery import app

from celery import chord

@app.task
def get_numbers():
    return list(range(10))

@app.task
def is_even(x):
    return x % 2 == 0

@app.task
def filter_even(evenResults, allNumbers):
    return [x for isEven, x in zip(evenResults, allNumbers)
            if isEven]

@app.task
def is_even_group(allNumbers):
    return group(is_even.s(x) for x in allNumbers)

@app.task
def get_chord(allNumbers):
    return chord(is_even_group(allNumbers))(
        filter_even.s(allNumbers))

@app.task
def xsum(numbers):
    return sum(numbers)

@app.task
def get_chord_with_sum(allNumbers):
    return chord(is_even_group(allNumbers))(
        filter_even.s(allNumbers) | xsum.s())

@app.task
def main_task():
    return (get_numbers.s() | get_chord.s())

@app.task
def main_task_with_sum():
    return (get_numbers.s() | get_chord_with_sum())

When I try to run this individually as tasks, they are working fine.

$ virtualenv/bin/celery -A myprj worker -l info
In [1]: from myprj.tasks import *

In [2]: get_numbers.s()()
Out[2]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [3]: get_chord.s([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])().get()
Out[3]: [0, 2, 4, 6, 8]

In [4]: get_chord_with_sum.s([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])().get()
Out[4]: 20

But in get_chord_with_sum, xsum was added as callback function, What design I am looking for is (get_numbers.s() | get_chord.s() | xsum.s()) where xsum is not part of the callback.

If we can make it more simple as (get_numbers.s() | is_even_group.s() | filter_even.s() | xsum.s())

Question 1: How to make continue in chain after chord/group?

When I tried to run the chain with chord, it return me some task ids only.

In [5]: main_task()().get()
Out[5]:
[['47ece1df-be30-449a-bc86-a0cd68806a9c',
  [['867f8728-35de-4228-a1d5-a48a79890f97', None],
   [[['18bfe31d-2601-4e3f-825d-c15d87840abb', None], None],
    [['62e5671c-b2b3-4597-940d-df3b10393a20', None], None],
    [['1e419913-2f8d-4b8b-9c0b-d73ec0be3971', None], None],
    [['a5b64d9a-5815-4aa6-9d6c-9a2125a94238', None], None],
    [['1d378b19-de2d-4562-9c71-a2f38435924f', None], None],
    [['94425456-46ee-41d7-8b3f-7bddd310d09d', None], None],
    [['a60fa629-ce5a-42fc-90d4-e150d838c7a3', None], None],
    [['8ed93800-3b13-41ed-8bef-123cdb4f4cbf', None], None],
    [['dbc7e807-6e48-422d-a363-153de0dc1909', None], None],
    [['37bc0603-39a4-4600-816b-3cb1d03a5301', None], None]]]],
 None]

On console, it display the results

[2021-03-12 16:22:19,241: INFO/ForkPoolWorker-8] Task myprj.tasks.get_numbers[1bd444bd-3d5a-4b24-9f3f-4f606dbaa7f2] succeeded in 0.0044793810000101075s: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[2021-03-12 16:22:19,252: INFO/MainProcess] Received task: myprj.tasks.is_even[18bfe31d-2601-4e3f-825d-c15d87840abb]
[2021-03-12 16:22:19,252: INFO/MainProcess] Received task: myprj.tasks.is_even[62e5671c-b2b3-4597-940d-df3b10393a20]
[2021-03-12 16:22:19,253: INFO/MainProcess] Received task: myprj.tasks.is_even[1e419913-2f8d-4b8b-9c0b-d73ec0be3971]
[2021-03-12 16:22:19,255: INFO/MainProcess] Received task: myprj.tasks.is_even[a5b64d9a-5815-4aa6-9d6c-9a2125a94238]
[2021-03-12 16:22:19,258: INFO/MainProcess] Received task: myprj.tasks.is_even[1d378b19-de2d-4562-9c71-a2f38435924f]
[2021-03-12 16:22:19,259: INFO/MainProcess] Received task: myprj.tasks.is_even[94425456-46ee-41d7-8b3f-7bddd310d09d]
[2021-03-12 16:22:19,260: INFO/MainProcess] Received task: myprj.tasks.is_even[a60fa629-ce5a-42fc-90d4-e150d838c7a3]
[2021-03-12 16:22:19,262: INFO/MainProcess] Received task: myprj.tasks.is_even[8ed93800-3b13-41ed-8bef-123cdb4f4cbf]
[2021-03-12 16:22:19,263: INFO/MainProcess] Received task: myprj.tasks.is_even[dbc7e807-6e48-422d-a363-153de0dc1909]
[2021-03-12 16:22:19,263: INFO/MainProcess] Received task: myprj.tasks.is_even[37bc0603-39a4-4600-816b-3cb1d03a5301]
[2021-03-12 16:22:19,265: INFO/ForkPoolWorker-3] Task myprj.tasks.is_even[1e419913-2f8d-4b8b-9c0b-d73ec0be3971] succeeded in 0.011537615000008827s: True
[2021-03-12 16:22:19,265: INFO/ForkPoolWorker-2] Task myprj.tasks.is_even[62e5671c-b2b3-4597-940d-df3b10393a20] succeeded in 0.011093899000002239s: False
[2021-03-12 16:22:19,266: INFO/ForkPoolWorker-8] Task myprj.tasks.is_even[18bfe31d-2601-4e3f-825d-c15d87840abb] succeeded in 0.01203034400003844s: True
[2021-03-12 16:22:19,268: INFO/ForkPoolWorker-1] Task myprj.tasks.get_chord[89c8700b-69ce-4889-957f-e9f0db7b1cad] succeeded in 0.02606777100004365s: <AsyncResult: 47ece1df-be30-449a-bc86-a0cd68806a9c>
[2021-03-12 16:22:19,271: INFO/ForkPoolWorker-4] Task myprj.tasks.is_even[a5b64d9a-5815-4aa6-9d6c-9a2125a94238] succeeded in 0.014094042000010631s: False
[2021-03-12 16:22:19,272: INFO/ForkPoolWorker-5] Task myprj.tasks.is_even[1d378b19-de2d-4562-9c71-a2f38435924f] succeeded in 0.011506181000015658s: True
[2021-03-12 16:22:19,274: INFO/ForkPoolWorker-7] Task myprj.tasks.is_even[a60fa629-ce5a-42fc-90d4-e150d838c7a3] succeeded in 0.012725966999994398s: True
[2021-03-12 16:22:19,275: INFO/ForkPoolWorker-6] Task myprj.tasks.is_even[94425456-46ee-41d7-8b3f-7bddd310d09d] succeeded in 0.014057033000028696s: False
[2021-03-12 16:22:19,277: INFO/ForkPoolWorker-2] Task myprj.tasks.is_even[dbc7e807-6e48-422d-a363-153de0dc1909] succeeded in 0.009334530999979052s: True
[2021-03-12 16:22:19,278: INFO/ForkPoolWorker-3] Task myprj.tasks.is_even[37bc0603-39a4-4600-816b-3cb1d03a5301] succeeded in 0.010436902999970243s: False
[2021-03-12 16:22:19,284: INFO/ForkPoolWorker-8] Task myprj.tasks.is_even[8ed93800-3b13-41ed-8bef-123cdb4f4cbf] succeeded in 0.017027554000037526s: False
[2021-03-12 16:22:19,286: INFO/MainProcess] Received task: myprj.tasks.filter_even[47ece1df-be30-449a-bc86-a0cd68806a9c]
[2021-03-12 16:22:19,290: INFO/ForkPoolWorker-8] Task myprj.tasks.filter_even[47ece1df-be30-449a-bc86-a0cd68806a9c] succeeded in 0.0037238450000245393s: [0, 2, 4, 6, 8]

Question 2: How to get results if chord is part of chain ?

Combine chord with chain might solve this.

Nilesh
  • 20,521
  • 16
  • 92
  • 148

0 Answers0