0

I have two files:

app.py

from flask import Flask
from flask_restful import Api
from celery import Celery
from resources.item import Item, ItemList, ItemInsert
from db import db

app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = ""
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False

app.config['CELERY_BROKER_URL'] = ''
app.config['CELERY_RESULT_BACKEND'] = ''

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])

app.secret_key = ""
api = Api(app)

@app.before_first_request
def create_tables():
    db.create_all()

api.add_resource(ItemList,"/items")
api.add_resource(Item,"/item/<int:id>")
api.add_resource(ItemInsert,"/items")

@celery.task
def update_row(data):
    pass

if __name__ == "__main__":
    db.init_app(app)
    app.run(debug=True,port=5000)

item.py

from flask_restful import Resource, reqparse
from flask import request
from models.item import ItemModel

class ItemInsert(Resource):
    def post(self):
        file_task = update_row.apply_async((data,), countdown=3)
        return item.json(), 201

As you can see in app.py I have imported classes from item.py, however now my celery (task) function call i.e update_row from item.py is left hanging, since I cannot import from app.py as it will result in a cyclic import. Is there any solution?

rfkortekaas
  • 6,049
  • 2
  • 27
  • 34
radix
  • 198
  • 1
  • 1
  • 12

2 Answers2

1

With simple project, you could implement the tasks inside app.py as you're doing for now. But with more complicated project, it's better to move the tasks definition into a separated package so that it could mitigate the cyclic import.

Like so:

App and celery configutation

**# app.py**
# App & celery
# ...

Tasks definitions

**# tasks.py**
from project_name.app import celery

@celery.task
def update_row(data):
    pass

API

**# resources/item.py**
from project_name.tasks import update_row
# ...

Separate the tasks into another package (tasks package, which is auto discovered by Celery) could help you to prevent cyclic import and also good to maintain the code.


But if you're still want to use the current approach, to prevent cyclic import, you could import it dynamically when calling API:

**# resources/item.py**
# ...
class ItemInsert(Resource):
    def post(self):
        from project_name.app import update_row
        file_task = update_row.apply_async((data,), countdown=3)
        return item.json(), 201
Toan Quoc Ho
  • 3,266
  • 1
  • 14
  • 22
  • the line 'from project_name.tasks import update_row' throws an error saying "no module name flaskapp" – radix Jan 16 '21 at 08:05
  • It's also depend on your PYTHON_PATH, maybe it's just `from tasks import *`, or you could set your project into PYTHON_PATH so that you could achieve it – Toan Quoc Ho Jan 16 '21 at 10:02
1

celery_app.task is a decorator, so just a regular Python function. It works in the following way: it takes your function, registers it in the celery app and returns a wrapper object with methods delay, apply_async etc. And you can always get a registered task from celery_app.tasks dictionary by its name. Another trick to avoid circular imports is in storing celery_app reference as an attribute of flask_app, and inside request context you can always get current flask app from flask.current_app

app.py

from tasks import register_tasks
...
app = Flask(__name__)
...
app.celery = Celery(app.name, ...)
register_tasks(app.celery)

tasks.py

def update_row(data):
    pass

def register_tasks(celery_app):
    celery_app.task(update_row, name="update_row")

views.py

from flask import current_app

class ItemInsert(Resource):
    def post(self):
        update_row = current_app.celery.tasks["update_row"]
        file_task = update_row.apply_async((data,), countdown=3)
        return item.json(), 201

UPD: indeed the most canonical way is to use autodiscovery of tasks:

myapp/tasks.py

from celery import shared_task

@shared_task
def update_row(data):
    pass

myapp/app.py

celery_app = Celery(...)
celery_app.set_default()
celery_app.autodiscover_tasks(["myapp"], force=True)

myapp/views.py

from .tasks import update_row

def index_view():
    update_row.delay(...)
inf581
  • 612
  • 1
  • 7
  • 9