2

I have a Flask REST API which leverages Celery for running async requests.

The idea is that an async=1 query parameter indicates the request should be processed asynchronously (returning a task ID immediately which the client will use later).

At the same time I want to prevent accepting new tasks when there're too many waiting for processing.

The code below works, but accepting_new_tasks() takes ~2 seconds which is way too slow.

Is there a config (or something) in Celery that allows to limit the number of waiting tasks; or a faster way to get the number of waiting tasks?

import math

from celery import Celery
from flask import abort, Flask, jsonify, request


flask_app = Flask(__name__)
celery_app = Celery("tasks", broker="rabbit...")


@flask_app.route("/")
def home():
    async_ = request.args.get("async")
    settings = request.args.get("settings")

    if async_:
        if not accepting_new_tasks(celery_app):
            return abort(503)

        task = celery_app.send_task(name="my-task", kwargs={"settings": settings})
        return jsonify({"taskId": task.id})

    return jsonify({})


def accepting_new_tasks(celery_app):
    inspector = celery_app.control.inspect()
    nodes_stats = inspector.stats()
    nodes_reserved = inspector.reserved()

    workers = 0
    for stats in nodes_stats.values():
        workers += stats["pool"]["max-concurrency"]

    waiting_tasks = 0
    for reserved in nodes_reserved.values():
        waiting_tasks += len(reserved)

    return waiting_tasks < math.ceil(workers / 3)
Dušan Maďar
  • 9,269
  • 5
  • 49
  • 64
  • Can you add some details? For what is it? I mean, what problems do you want to solve? – Danila Ganchar Mar 29 '19 at 10:53
  • @DanilaGanchar I want to process large amount of data which can take up to hours based on passed `settings`. – Dušan Maďar Mar 29 '19 at 11:57
  • ok. let's imaging now all workers are working(and now `12:12:12.52....`). you returned `abort(503)`, but in `12:12:12.53....` some worker became available. is it correct behavior? I mean do you have any time ranges for waiting? – Danila Ganchar Mar 29 '19 at 12:20
  • @DanilaGanchar no, no time ranges for waiting. The idea is to check before each request. Anyway, I have solved it by using RabbitMQ management API: https://stackoverflow.com/a/27074594/4183498. – Dušan Maďar Mar 29 '19 at 20:19

1 Answers1

0

Eventually I solved this by querying RabbitMQ management API as https://stackoverflow.com/a/27074594/4183498 points out.

import math

from celery import Celery
from flask import abort, Flask, jsonify, request
from requests import get
from requests.auth import HTTPBasicAuth


flask_app = Flask(__name__)
celery_app = Celery("tasks", broker="rabbit...")


def get_workers_count():
    inspector = celery_app.control.inspect()
    nodes_stats = inspector.stats()
    nodes_reserved = inspector.reserved()

    workers = 0
    for stats in nodes_stats.values():
        workers += stats["pool"]["max-concurrency"]

    return workers


WORKERS_COUNT = get_workers_count()


@flask_app.route("/")
def home():
    async_ = request.args.get("async")
    settings = request.args.get("settings")

    if async_:
        if not accepting_new_tasks(celery_app):
            return abort(503)

        task = celery_app.send_task(name="my-task", kwargs={"settings": settings})
        return jsonify({"taskId": task.id})

    return jsonify({})


def accepting_new_tasks(celery_app):WORKERS_COUNT
    auth = HTTPBasicAuth("guest", "guest")
    response = get(
        "http://localhost:15672/api/queues/my_vhost/celery",
         auth=auth
    )
    waiting_tasks = response.json()["messages"]
    return waiting_tasks < math.ceil(WORKERS_COUNT / 3)
Dušan Maďar
  • 9,269
  • 5
  • 49
  • 64