I am trying the chaining mentioned in canvas. Below is my project structure.
proj/
├── __init__.py
├── celery.py
├── celeryconfig.py
├── module1
│ ├── __init__.py
│ └── tasks.py
└── module2
└── tasks.py
File: celery.py
"""Main entry module for celery tasks."""
from __future__ import absolute_import, unicode_literals
from celery import Celery
from . import celeryconfig
app = Celery('mycelery')
app.config_from_object(celeryconfig)
File: celeryconfig.py
"""Celery Configuration."""
import os
broker_url = os.getenv("CELERY_BROKER_URL")
result_backend = os.getenv("CELERY_BACKEND_URL")
worker_max_tasks_per_child = 4
include = [
"proj.module1.tasks",
"proj.module2.tasks"]
File: module1/tasks.py
from __future__ import absolute_import, unicode_literals
from celery import group
from celery import subtask
from proj.celery import app
@app.task
def my_workflow(x, y):
(mul.s(x, y) | add.s(5)).apply_async(queue="myq")
# (get_array.s(x, y) | dmap.s(add.s())).apply_async(queue="myq")
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def get_array(x, y):
return [(x, x)] * y
@app.task
def dmap(listResp, processFunc):
callback = subtask(processFunc)
return group(callback.clone([arg,]) for arg in listResp)()
I installed virtual env to run my celery tasks. It has celery 4.4.2
installed.
$ virtualenv/bin/pip freeze
amqp==2.5.2
appnope==0.1.0
backcall==0.2.0
billiard==3.6.3.0
celery==4.4.2
decorator==4.4.2
importlib-metadata==1.6.0
ipython==7.16.1
ipython-genutils==0.2.0
jedi==0.17.1
kombu==4.6.8
parso==0.7.0
pexpect==4.8.0
pickleshare==0.7.5
prompt-toolkit==3.0.5
ptyprocess==0.6.0
Pygments==2.6.1
pytz==2020.1
redis==3.5.3
six==1.15.0
traitlets==4.3.3
vine==1.3.0
wcwidth==0.2.5
zipp==3.1.0
I try to run without listening on specific queue.
$ virtualenv/bin/celery worker -A proj -l INFO
celery@MQM1CPG8WL v4.4.2 (cliffs)
Darwin-18.7.0-x86_64-i386-64bit 2020-07-11 00:38:04
[config]
.> app: mycelery:0x1115be410
.> transport: amqp://dev:**@127.0.0.1:5672/my-dev
.> results: redis://127.0.0.1/0
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. proj.module1.tasks.add
. proj.module1.tasks.dmap
. proj.module1.tasks.get_array
. proj.module1.tasks.mul
. proj.module1.tasks.my_workflow
[2020-07-11 00:38:04,300: INFO/MainProcess] Connected to amqp://dev:**@127.0.0.1:5672/my-dev
[2020-07-11 00:38:04,317: INFO/MainProcess] mingle: searching for neighbors
[2020-07-11 00:38:05,369: INFO/MainProcess] mingle: all alone
[2020-07-11 00:38:05,412: INFO/MainProcess] celery@MQM1CPG8WL ready.
[2020-07-11 00:38:07,240: INFO/MainProcess] Events of group {task} enabled by remote.
Then I try to use chain.
$ virtualenv/bin/ipython
Python 3.7.4 (default, Dec 9 2019, 10:50:13)
Type 'copyright', 'credits' or 'license' for more information
IPython 7.16.1 -- An enhanced Interactive Python. Type '?' for help.
In [1]: from proj.module1.tasks import add, mul
In [2]: res = (add.s(4, 4) | mul.s(8) | mul.s(10)).apply_async()
In [3]: res.get()
Out[3]: 640
Celery worker has log as.
[2020-07-11 00:41:40,051: INFO/MainProcess] Received task: proj.module1.tasks.add[dcedd03b-405d-4490-a1dd-5f697cdf9444]
[2020-07-11 00:41:40,093: INFO/MainProcess] Received task: proj.module1.tasks.mul[8019e971-8347-4a1d-b7db-9d62fb147417]
[2020-07-11 00:41:40,094: INFO/ForkPoolWorker-8] Task proj.module1.tasks.add[dcedd03b-405d-4490-a1dd-5f697cdf9444] succeeded in 0.04052638599998204s: 8
[2020-07-11 00:41:40,129: INFO/MainProcess] Received task: proj.module1.tasks.mul[6a5f129b-07c5-43df-a55c-525a67fe9feb]
[2020-07-11 00:41:40,131: INFO/ForkPoolWorker-1] Task proj.module1.tasks.mul[8019e971-8347-4a1d-b7db-9d62fb147417] succeeded in 0.03607683999999267s: 64
[2020-07-11 00:41:40,132: INFO/ForkPoolWorker-8] Task proj.module1.tasks.mul[6a5f129b-07c5-43df-a55c-525a67fe9feb] succeeded in 0.001778557999983832s: 640
Now instead of listening on celery
queue, I want my worker to listen on myq
. I started my worker with myq
as below.
$ virtualenv/bin/celery worker -A proj -l INFO -Q myq
celery@MQM1CPG8WL v4.4.2 (cliffs)
Darwin-18.7.0-x86_64-i386-64bit 2020-07-11 00:43:45
[config]
.> app: mycelery:0x10dce2490
.> transport: amqp://dev:**@127.0.0.1:5672/my-dev
.> results: redis://127.0.0.1/0
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> myq exchange=myq(direct) key=myq
[tasks]
. proj.module1.tasks.add
. proj.module1.tasks.dmap
. proj.module1.tasks.get_array
. proj.module1.tasks.mul
. proj.module1.tasks.my_workflow
[2020-07-11 00:43:45,587: INFO/MainProcess] Connected to amqp://dev:**@127.0.0.1:5672/my-dev
[2020-07-11 00:43:45,601: INFO/MainProcess] mingle: searching for neighbors
[2020-07-11 00:43:46,658: INFO/MainProcess] mingle: all alone
[2020-07-11 00:43:46,701: INFO/MainProcess] celery@MQM1CPG8WL ready.
[2020-07-11 00:43:47,261: INFO/MainProcess] Events of group {task} enabled by remote.
Now I am trying same chain with myq
as queue.
$ virtualenv/bin/ipython
Python 3.7.4 (default, Dec 9 2019, 10:50:13)
Type 'copyright', 'credits' or 'license' for more information
IPython 7.16.1 -- An enhanced Interactive Python. Type '?' for help.
In [1]: from proj.module1.tasks import add, mul
In [2]: res = (add.s(4, 4) | mul.s(8) | mul.s(10)).apply_async(queue="myq")
In [3]: res.get()
This res.get
stuck in this phase for a while. When I check the worker log
it has only one task entry.
[2020-07-11 00:45:45,700: INFO/MainProcess] Received task: proj.module1.tasks.add[d58fba45-429b-4c3c-8598-3a24916e6fce]
[2020-07-11 00:45:45,747: INFO/ForkPoolWorker-8] Task proj.module1.tasks.add[d58fba45-429b-4c3c-8598-3a24916e6fce] succeeded in 0.044210322999987284s: 8
Which indicate that, only first task reach to the myq
queue.
When I check the messages in queue, celery
queue has 1
message.
[[8, 8], {}, {"callbacks": null, "errbacks": null, "chain": [{"task": "proj.module1.tasks.mul", "args": [10], "kwargs": {}, "options": {"task_id": "efd420e4-845c-4198-8f97-e8ae0015f9e9", "reply_to": "26aee9c3-21e3-3ce4-a1b1-a3bef6e7e5b0"}, "subtask_type": null, "chord_size": null, "immutable": false}], "chord": null}]
Some how, the next message in chain redirected to celery
queue, instead of myq
.
How to tell chain to send all the task to myq
?