I'm designing a celery task workflow that has task test_request_task, test_chord_callback_task, test_chain_of_chord_callback_task and test_chain_of_chords_task.
I need to run task test_request_task, n(1<=n<=100) times per second for m seconds(5<=m<=30). I have tried designing the workflow with celery group, chain and chord.
With smaller numbers for m and n, there seems to be no issue but as they grow larger, task execution gets slower and at a point produces PreconditionFailed(406, 'PRECONDITION_FAILED - message size is larger than configured max size 134217728', (60, 40), 'Basic.publish')
.
I've solved the problem by breaking down the workflow and delegating them to intermediate tasks but, I need to know what was I doing wrong. Any suggestions with relevant best practices are appreciated. Increasing the default rabbitmq message size doesn't seem to be an ideal option as the actual payload may get bigger dynamically.
I'm using rabbitmq as the broker and redis as the result backend(I'm keeping result for only one task that is inside chord). My celery config and tasks are below
CELERY_BROKER: str = (
f"pyamqp://xxx:Zzzz1234@rabbitmq:5672//"
)
CELERY_BACKEND: str = f"redis://redis:6379"
celery_app = Celery(
__name__, broker=CELERY_BROKER, backend=CELERY_BACKEND
)
celery_app.conf.broker_pool_limit = 500
celery_app.conf.worker_prefetch_multiplier = 5
celery_app.conf.worker_send_task_events = False
celery_app.conf.task_send_sent_event = False
celery_app.conf.task_ignore_result = True
celery_app.conf.task_store_errors_even_if_ignored = True
celery_app.conf.result_expires = 30
@celery_app.task(ignore_result=False)
def test_request_task(serial: int):
"""
The unit task.
:param serial: int
:return: starting time of this task
"""
logger.info(f"Task {serial} started")
start_time = time.time()
TestModel.objects.create(req_number=serial, req_exec=True)
try:
resp = requests.get(f"http://mock_app:10034/test_request?serial={serial}")
TestModel(req_number=serial).update(rec_ack=True)
except Exception as e:
logger.error(f"Task {serial} - ERROR: {e.__str__()}")
return start_time
@celery_app.task
def test_chord_callback_task(start_times: list[float]):
"""
Task to ensure I don't execute more than n test_request_task in 1 sec.
If all test_request_task complete under 1 sec then sleep for the remainder of that sec,
Otherwise do nothing.
:param start_times: list[float]
"""
now = time.time()
logger.info(f"Group tasks finished")
time_since_first_task_in_grp = now - start_times[0] if len(start_times) > 0 else 0
if time_since_first_task_in_grp < 1.0:
sleep_time_sec = 1.0 - time_since_first_task_in_grp
logger.info(f"SLEEPING {sleep_time_sec}")
time.sleep(sleep_time_sec)
@celery_app.task
def test_chain_of_chord_callback_task():
"""
Just another task that runs after all tasks have finished executing
:return:
"""
logger.info("Chain of chord finished")
@celery_app.task
def test_chain_of_chords_task(group_size: int, chain_size: int):
"""
Make chord with a group of group_size test_request_task to run in parallel and ensure 1 sec completion,
with callback test_chord_callback_task.
Make chain_size numbers of above chords and chain them with additional test_chain_of_chord_callback_task.
:param group_size:
:param chain_size:
"""
total_size = group_size * chain_size
chords = (
chord(
group(
test_request_task.si(serial)
for serial in range(i, i + group_size)
),
test_chord_callback_task.s()
) for i in range(0, total_size, group_size)
)
chain_of_chords_with_callback = chain(
*chords,
test_chain_of_chord_callback_task.si()
)
logger.info("Firing chain of chords")
chain_of_chords_with_callback.delay()
This workflow crashes even with group_size 5 and chain_size 8. Allocated resource in the docker-compose is below:
celery_worker:
restart: always
build: .
volumes:
- .:/code
command:
- "celery"
- "-A"
- "app.worker.celery_app"
- "worker"
- "--pool=eventlet"
- "--concurrency=500"
- "--loglevel=INFO"
depends_on:
- rabbitmq
- redis
deploy:
resources:
limits:
cpus: '1.0'
memory: 2G
reservations:
cpus: '0.50'
memory: 500M
rabbitmq:
image: rabbitmq:3.11.3-management-alpine
ports:
- "5672:5672"
- "15672:15672"
environment:
- RABBITMQ_HOST=rabbitmq
- RABBITMQ_PORT=5672
- RABBITMQ_DEFAULT_USER=xxx
- RABBITMQ_DEFAULT_PASS=Zzzz1234
deploy:
resources:
limits:
cpus: '0.5'
memory: 1G
reservations:
cpus: '0.50'
memory: 500M
redis:
image: redis:7.0.5-alpine
ports:
- "6379:6379"
environment:
- REDIS_HOST=redis
- REDIS_PORT=6379
deploy:
resources:
limits:
cpus: '0.5'
memory: 100M
reservations:
cpus: '0.50'
memory: 50M