21

I'm really struggling to the get the proper setup for Flask, SQLAlchemy and Celery. I have searched extensively and tried different approaches, nothing really seems to work. Either I missed the application context or can't run the workers or there are some other problems. The structure is very general so that I can build a bigger application.

I'm using: Flask 0.10.1, SQLAlchemy 1.0, Celery 3.1.13, my current setup is the following:

app/__init__.py

#Empty

app/config.py

import os
basedir = os.path.abspath(os.path.dirname(__file__))

class Config:

    @staticmethod
    def init_app(app):
        pass

class LocalConfig(Config):
    DEBUG = True
    SQLALCHEMY_DATABASE_URI = r"sqlite:///" + os.path.join(basedir, 
                                 "data-dev.sqlite")
    CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'


config = {
    "local": LocalConfig}

app/exstensions.py

from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery

db = SQLAlchemy()
celery = Celery()

app/factory.py

from extensions import db, celery
from flask import Flask
from flask import g
from config import config

def create_before_request(app):
    def before_request():
        g.db = db
    return before_request


def create_app(config_name):
    app = Flask(__name__)
    app.config.from_object(config[config_name])

    db.init_app(app)
    celery.config_from_object(config)

    # Register the blueprints

    # Add the before request handler
    app.before_request(create_before_request(app))
    return app

app/manage.py

from factory import create_app

app = create_app("local")

from flask import render_template
from flask import request

@app.route('/test', methods=['POST'])
def task_simple():
    import tasks
    tasks.do_some_stuff.delay()
    return ""

if __name__ == "__main__":
    app.run()

app/models.py

from extensions import db

class User(db.Model):
    __tablename__ = "user"

    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(128), unique=True, nullable=False)

app/tasks.py

from extensions import celery
from celery.signals import task_prerun
from flask import g, current_app


@task_prerun.connect
def close_session(*args, **kwargs):
    with current_app.app_context():
       # use g.db
       print g

@celery.task()
def do_some_stuff():
    with current_app.app_context():
       # use g.db
       print g

In the folder app:

  • starting the development webserver with: python.exe manage.py
  • starting the workers with: celery.exe worker -A tasks

I get an import error that doesn't make any sense to me. Should I structure the application differently? At the end I think I want a quite basic setup, e.g. Using Flask with the factory pattern, be able to use the Flask-SQLAlchmey extension and have some worker that needs to access the database.

Any help is highly appreciated.

Traceback is executed when starting the celery worker.

Traceback (most recent call last):

  File "[PATH]\scripts\celery-script.py", line 9, in <module>
    load_entry_point('celery==3.1.13', 'console_scripts', 'celery')()

  File "[PATH]\lib\site-packages\celery\__main__.py", line 30, in main
    main()

  File "[PATH]\lib\site-packages\celery\bin\celery.py", line 81, in main
    cmd.execute_from_commandline(argv)

  File "[PATH]\lib\site-packages\celery\bin\celery.py", line 769, in execute_from_commandline
    super(CeleryCommand, self).execute_from_commandline(argv)))

  File "[PATH]\lib\site-packages\celery\bin\base.py", line 305, in execute_from_commandline
    argv = self.setup_app_from_commandline(argv)

  File "[PATH]\lib\site-packages\celery\bin\base.py", line 473, in setup_app_from_commandline
    user_preload = tuple(self.app.user_options['preload'] or ())
AttributeError: 'Flask' object has no attribute 'user_options'

UPDATE I change the code according to the suggestion in the comment. The worker starts now up but when test it with a get request to http://127.0.0.1:5000/test. I get the following traceback:

Traceback (most recent call last):
  File "[PATH]\lib\site-packages\celery\app\trace.py", line 230, in trace_task
    args=args, kwargs=kwargs)

  File "[PATH]\lib\site-packages\celery\utils\dispatch\signal.py", line 166, in send
    response = receiver(signal=self, sender=sender, \**named)

  File "[PATH]\app\stackoverflow\tasks.py", line 7, in close_session
    with current_app.app_context():

  File "[PATH]\lib\site-packages\werkzeug\local.py", line 338, in __getattr__
    return getattr(self._get_current_object(), name)

  File "[PATH]\lib\site-packages\werkzeug\local.py", line 297, in _get_current_object
    return self.__local()

  File "[PATH]\lib\site-packages\flask\globals.py", line 34, in _find_app
    raise RuntimeError('working outside of application context')
RuntimeError: working outside of application context exc, exc_info.traceback)))

UPDATE Based on the comments from Marteen, I changed the code. The current working version is under: https://gist.github.com/anonymous/fa47834db2f4f3b8b257. Any further improvements or suggestions are welcome.

Glenn Dayton
  • 1,410
  • 2
  • 20
  • 38
foobar
  • 507
  • 1
  • 8
  • 20
  • 2
    What is the import error message? Could you put the backtrace here? – mehdix Aug 18 '14 at 11:50
  • The traceback is showing an AttributeError which seems to originate in celery-script.py, line 9. If this is the error you are referring to, please also include the content of celery-script.py. Otherwise as Mehdi Sadeghi says please incude the import error message. – Maarten Aug 19 '14 at 08:34
  • celery-script.py is not part of my project but is the standard script from the celery package that is installed. – foobar Aug 19 '14 at 08:38
  • 1
    Try using current app. It does not throw the exception on my machine. In tasks.py from flask import g, current_app, then use current app instead of app. remove the "from manage import app" – Maarten Aug 19 '14 at 12:22
  • Thanks. You're right I can start the worker now. I started to test by sending a POST request to: http://127.0.0.1:5000/test. (in the task_simple function manage.py I added a return statement). The task is sent to the worker. Worker Traceback is now above in the description. – foobar Aug 19 '14 at 12:37
  • Let me know how it turns out, I did not have a message queue running to test – Maarten Aug 19 '14 at 13:03
  • See my update above. Traceback says that I don't get the correct application context. – foobar Aug 19 '14 at 14:36

2 Answers2

23

I was off with the current_app advice.

Your celery object needs access to the application context. I found some information online about creating the Celery object with a factory function. Example below is tested without a message broker.

#factory.py
from celery import Celery
from config import config

def create_celery_app(app=None):
    app = app or create_app(config)
    celery = Celery(__name__, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery

and in tasks.py:

#tasks.py
from factory import create_celery_app
from celery.signals import task_prerun
from flask import g

celery = create_celery_app()

@task_prerun.connect
def celery_prerun(*args, **kwargs):
    #print g
    with celery.app.app_context():
    #   # use g.db
       print g

@celery.task()
def do_some_stuff():
    with celery.app.app_context():
        # use g.db
        g.user = "test"
        print g.user

Some links:

Flask pattern for creating a Celery instance with factory function

Application using both application factory and celery

Source for said application's factory.py

Source for application tasks.py

Maarten
  • 492
  • 7
  • 14
  • Hi Maarten, thanks for your answer. With your hint I got it working with a few minor adaptions. So I would like to accept it as correct answer but nonetheless post the full working answer. Do you know what is the usual approach? – foobar Aug 21 '14 at 07:15
  • Happy to hear you were able to fix your code. If you want to accept the answer you can post your working code in an edit in the original question. – Maarten Aug 21 '14 at 07:36
  • Playing around a bit with my current solution and with your suggestion, I'm struggeling to understand why decorator needs to set explicitly the base. e.g. `@celery.task(base=celery.Task) def do_some_stuff(): print g` whereas just @celery.task does give an outside application context. – foobar Aug 21 '14 at 08:09
  • Did you use "celery.Task = ContextTask" in your factory function? Or maybe you put your working code somewhere and link to it? – Maarten Aug 21 '14 at 08:16
  • see my updated answer above. I removed the comments before the with statement and I put the print statement within the with statement. that allowed me to remove "base=celery.Task" (on a Linux mint machine) – Maarten Aug 21 '14 at 10:40
  • 2
    The problem with that approach is that you then create the application context directly. Which means the whole ContextTask class is kind of useless. – foobar Aug 21 '14 at 17:40
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/59756/discussion-between-maarten-and-foobar). – Maarten Aug 21 '14 at 18:41
  • Are the with statements inside the task functions really needed? For me it seems to work without them. – zwirbeltier Mar 14 '15 at 11:41
  • It is not a question of necessity. See following links some examples and reasons to use the with statement.http://stackoverflow.com/questions/3012488/what-is-the-python-with-statement-designed-for Also http://effbot.org/zone/python-with-statement.htm – Maarten Mar 18 '15 at 15:11
  • I followed the tutorial but how come my celery doesn't see the app? with celery.app.app_context(): It doesnt have celery.app.... Any reason why? – user805981 Aug 01 '15 at 20:49
  • @Maarten Do you have to do celery.app = app inside of your create_celery_app? – user805981 Aug 01 '15 at 22:11
  • Yes, the celery object has a variable called app which you need to assign the Flask object. in this case the app you pass to the factory function. If you are unsure, please create a separate question so we can keep this one clean :) – Maarten Aug 19 '15 at 10:26
  • How would we go about configuring testing configurations and setting up the celery app for celery tasks in unit test? – user805981 Nov 11 '15 at 13:33
  • There are two levels of redundancies in this answer. Once you have set your celery app to use the `ContextTask` class, each instance of a task will automatically push a Flask context upon being called. You don't need those explicit calls to `celery.app.app_context()` in `@task_prerun.connect` and `@task`. You're already in a context! – Michael Ekoka Jun 06 '23 at 07:59
7

Here is a solution which works with the flask application factory pattern and also creates celery task with context, without needing to use app.app_context() explicitly in the tasks. In my app, it is really tricky to get that app object while avoiding circular imports, but this solves it. This is also good for the latest celery version 4.2 at time of writing.

Structure:

repo_name/
    manage.py
    base/
    base/__init__.py
    base/app.py
    base/runcelery.py
    base/celeryconfig.py
    base/utility/celery_util.py
    base/tasks/workers.py

So base is the main application package in this example. In the base/__init__.py we create the celery instance as below:

from celery import Celery
celery = Celery('base', config_source='base.celeryconfig')

The base/app.py file contains the flask app factory create_app and note the init_celery(app, celery) it contains:

from base import celery
from base.utility.celery_util import init_celery

def create_app(config_obj):
    """An application factory, as explained here:
    http://flask.pocoo.org/docs/patterns/appfactories/.
    :param config_object: The configuration object to use.
    """
    app = Flask('base')
    app.config.from_object(config_obj)
    init_celery(app, celery=celery)
    register_extensions(app)
    register_blueprints(app)
    register_errorhandlers(app)
    register_app_context_processors(app)
    return app

Moving on to base/runcelery.py contents:

from flask.helpers import get_debug_flag
from base.settings import DevConfig, ProdConfig
from base import celery
from base.app import create_app
from base.utility.celery_util import init_celery
CONFIG = DevConfig if get_debug_flag() else ProdConfig
app = create_app(CONFIG)
init_celery(app, celery)

Next, the base/celeryconfig.py file (as an example):

# -*- coding: utf-8 -*-
"""
Configure Celery. See the configuration guide at ->
http://docs.celeryproject.org/en/master/userguide/configuration.html#configuration
"""

## Broker settings.
broker_url = 'pyamqp://guest:guest@localhost:5672//'
broker_heartbeat=0

# List of modules to import when the Celery worker starts.
imports = ('base.tasks.workers',)

## Using the database to store task state and results.
result_backend = 'rpc'
#result_persistent = False

accept_content = ['json', 'application/text']

result_serializer = 'json'
timezone = "UTC"

# define periodic tasks / cron here
# beat_schedule = {
#    'add-every-10-seconds': {
#        'task': 'workers.add_together',
#        'schedule': 10.0,
#        'args': (16, 16)
#    },
# }

Now define the init_celery in the base/utility/celery_util.py file:

# -*- coding: utf-8 -*-

def init_celery(app, celery):
    """Add flask app context to celery.Task"""
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask

For the workers in base/tasks/workers.py:

from base import celery as celery_app
from flask_security.utils import config_value, send_mail
from base.bp.users.models.user_models import User

@celery_app.task
def send_welcome_email(email, user_id, confirmation_link):
    """Background task to send a welcome email with flask-security's mail.
    You don't need to use with app.app_context() as Task has app context.
    """
    user = User.query.filter_by(id=user_id).first()
    print(f'sending user {user} a welcome email')
    send_mail(config_value('EMAIL_SUBJECT_REGISTER'),
              email,
              'welcome', user=user,
              confirmation_link=confirmation_link) 

@celery_app.task
def do_some_stuff():
    print(g)

Then, you need to start the celery beat and celery worker in two different cmd prompts from inside the repo_name folder.

In one cmd prompt do a celery -A base.runcelery:celery beat and the other celery -A base.runcelery:celery worker.

Then, run through your task that needed the flask context. Should work.

Bob Jordan
  • 3,049
  • 2
  • 34
  • 41