12

I have an application with Blueprints and Celery the code is here:

config.py

import os
from celery.schedules import crontab
basedir = os.path.abspath(os.path.dirname(__file__))

class Config:
    SECRET_KEY = os.environ.get('SECRET_KEY') or ''
    SQLALCHEMY_COMMIT_ON_TEARDOWN = True
    RECORDS_PER_PAGE = 40
    SQLALCHEMY_DATABASE_URI = ''
    CELERY_BROKER_URL = ''
    CELERY_RESULT_BACKEND = ''
    CELERY_RESULT_DBURI = ''
    CELERY_TIMEZONE = 'Europe/Kiev'
    CELERY_ENABLE_UTC = False
    CELERYBEAT_SCHEDULE = {}

    @staticmethod
    def init_app(app):
        pass


class DevelopmentConfig(Config):
    DEBUG = True
    WTF_CSRF_ENABLED = True
    APP_HOME = ''
    SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'
    CELERY_BROKER_URL = 'sqla+mysql://...'
    CELERY_RESULT_BACKEND = "database"
    CELERY_RESULT_DBURI = 'mysql://...'
    CELERY_TIMEZONE = 'Europe/Kiev'
    CELERY_ENABLE_UTC = False
    CELERYBEAT_SCHEDULE = {
        'send-email-every-morning': {
            'task': 'app.workers.tasks.send_email_task',
            'schedule': crontab(hour=6, minute=15),
        },
    }


class TestConfig(Config):
    DEBUG = True
    WTF_CSRF_ENABLED = False
    TESTING = True
    SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'


class ProdConfig(Config):
    DEBUG = False
    WTF_CSRF_ENABLED = True
    SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'
    CELERY_BROKER_URL = 'sqla+mysql://...celery'
    CELERY_RESULT_BACKEND = "database"
    CELERY_RESULT_DBURI = 'mysql://.../celery'
    CELERY_TIMEZONE = 'Europe/Kiev'
    CELERY_ENABLE_UTC = False
    CELERYBEAT_SCHEDULE = {
        'send-email-every-morning': {
            'task': 'app.workers.tasks.send_email_task',
            'schedule': crontab(hour=6, minute=15),
        },
    }

config = {
    'development': DevelopmentConfig,
    'default': ProdConfig,
    'production': ProdConfig,
    'testing': TestConfig,
}


class AppConf:
    """
    Class to store current config even out of context
    """
    def __init__(self):
        self.app = None
        self.config = {}

    def init_app(self, app):
        if hasattr(app, 'config'):
            self.app = app
            self.config = app.config.copy()
        else:
            raise TypeError

init.py: import os

from flask import Flask
from celery import Celery
from config import config, AppConf

def create_app(config_name):
    app = Flask(__name__)
    app.config.from_object(config[config_name])
    config[config_name].init_app(app)
    app_conf.init_app(app)

    # Connect to Staging view
    from staging.views import staging as staging_blueprint
    app.register_blueprint(staging_blueprint)

    return app


def make_celery(app=None):
    app = app or create_app(os.getenv('FLASK_CONFIG') or 'default')
    celery = Celery(__name__, broker=app.config.CELERY_BROKER_URL)
    celery.conf.update(app.conf)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery

tasks.py: from app import make_celery, app_conf

cel = make_celery(app_conf.app)

@cel.task
def send_realm_to_fabricdb(realm, form):
    some actions...

and here is the problem: The Blueprint "staging" uses task send_realm_to_fabricdb, so it makes: from tasks import send_realm_to_fabricdb than, when I just run application, everything goes ok BUT, when I'm trying to run celery: celery -A app.tasks worker -l info --beat, it goes to cel = make_celery(app_conf.app) in tasks.py, got app=None and trying to create application again: registering a blueprint... so I've got cycle import here. Could you tell me how to break this cycle? Thanks in advance.

user3319628
  • 135
  • 1
  • 4
  • 1
    Howdie, I'm currently dealing with the same issue. Were you able to get this working? – Jimmy Mar 02 '16 at 16:55

4 Answers4

8

I don't have the code to try this out, but I think things would work better if you move the creation of the Celery instance out of tasks.py and into the create_app function, so that it happens at the same time the app instance is created.

The argument you give to the Celery worker in the -A option does not need to have the tasks, Celery just needs the celery object, so for example, you could create a separate starter script, say celery_worker.py that calls create_app to create app and cel and then give it to the worker as -A celery_worker.cel, without involving the blueprint at all.

Hope this helps.

Miguel Grinberg
  • 65,299
  • 14
  • 133
  • 152
  • @Miguel Does this mean I am creating two instances of Flask, one in `celery_worker.py` and the other in maybe `manage.py`? I'm having the same issue. – Shulhi Sapli Sep 04 '15 at 10:44
  • 3
    @ShulhiSapli Yes, these are two different processes, each has its own application instance. But both should be created in the same way, so they are effectively equivalent (for example, they have the same configuration). The only purpose of the app instance in the Celery worker is to provide a context for code that needs that. You cannot pass `session`, `g`, `request` variables from one process to the other in this way. – Miguel Grinberg Sep 04 '15 at 16:33
2

What I do to solve this error is that I create two Flask instance which one is for Web app, and another is for initial Celery instance.

Like @Miguel said, I have

  • celery_app.py for celery instance
  • manager.py for Flask instance

And in these two files, each module has it's own Flask instance.

So I can use celery.task in Views. And I can start celery worker separately.

einverne
  • 6,454
  • 6
  • 45
  • 91
0

Thanks Bob Jordan, you can find the answer from https://stackoverflow.com/a/50665633/2794539,

Key points:
1. make_celery do two things at the same time: create celery app and run celery with flask content, so you can create two functions to do make_celery job
2. celery app must init before blueprint register

gaozhidf
  • 2,621
  • 1
  • 22
  • 17
0

Having the same problem, I ended up solving it very easily using shared_task (docs), keeping a single app.py file and not having to instantiate the flask app multiple times.

The original situation that led to the circular import:

from src.app import celery  # src.app is ALSO importing the blueprints which are importing this file which causes the circular import.


@celery.task(bind=True)
def celery_test(self):
    sleep(5)
    logger.info("Task processed by Celery.")

The current code that works fine and avoids the circular import:

# from src.app import celery <- not needed anymore!


@shared_task(bind=True)
def celery_test(self):
    sleep(5)
    logger.info("Task processed by Celery.")

Please mind that I'm pretty new to Celery so I might be overseeing important stuff, it would be great if someone more experienced can give their opinion.

Pere Picornell
  • 874
  • 1
  • 8
  • 15