I have two files:
app.py
from flask import Flask
from flask_restful import Api
from celery import Celery
from resources.item import Item, ItemList, ItemInsert
from db import db
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = ""
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config['CELERY_BROKER_URL'] = ''
app.config['CELERY_RESULT_BACKEND'] = ''
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
app.secret_key = ""
api = Api(app)
@app.before_first_request
def create_tables():
db.create_all()
api.add_resource(ItemList,"/items")
api.add_resource(Item,"/item/<int:id>")
api.add_resource(ItemInsert,"/items")
@celery.task
def update_row(data):
pass
if __name__ == "__main__":
db.init_app(app)
app.run(debug=True,port=5000)
item.py
from flask_restful import Resource, reqparse
from flask import request
from models.item import ItemModel
class ItemInsert(Resource):
def post(self):
file_task = update_row.apply_async((data,), countdown=3)
return item.json(), 201
As you can see in app.py
I have imported classes from item.py
, however now my celery (task) function call i.e update_row
from item.py
is left hanging, since I cannot import from app.py as it will result in a cyclic import. Is there any solution?