46

Some functions should run asynchronously on the web server. Sending emails or data post-processing are typical use cases.

What is the best (or most pythonic) way write a decorator function to run a function asynchronously?

My setup is a common one: Python, Django, Gunicorn or Waitress, AWS EC2 standard Linux

For example, here's a start:

from threading import Thread

def postpone(function):
    def decorator(*args, **kwargs):
        t = Thread(target = function, args=args, kwargs=kwargs)
        t.daemon = True
        t.start()
    return decorator

desired usage:

@postpone
def foo():
    pass #do stuff
tomcounsell
  • 4,991
  • 3
  • 34
  • 34
  • Look at this post too http://stackoverflow.com/questions/573618/django-set-up-a-scheduled-job. For a scheduled Job choose a cron based solution. Scheduled Job, Asynchronous tasks choose Celery. I start by https://github.com/tivix/django-cron before migrate to Celery recently. – Guillaume Vincent Aug 24 '13 at 17:27
  • 11
    Thanks for all the answers so far, however Celery requires quite a bit of overhead (installing the app, creating a db for it). So while Celery is a _solution_, it doesn't _answer_ my question about writing a standalone decorator to multithread a function. – tomcounsell Aug 24 '13 at 18:02

4 Answers4

73

I've continued using this implementation at scale and in production with no issues.

Decorator definition:

def start_new_thread(function):
    def decorator(*args, **kwargs):
        t = Thread(target = function, args=args, kwargs=kwargs)
        t.daemon = True
        t.start()
    return decorator

Example usage:

@start_new_thread
def foo():
  #do stuff

Over time, the stack has updated and transitioned without fail.

Originally Python 2.4.7, Django 1.4, Gunicorn 0.17.2, now Python 3.6, Django 2.1, Waitress 1.1.

If you are using any database transactions, Django will create a new connection and this needs to be manually closed:

from django.db import connection

@postpone
def foo():
  #do stuff
  connection.close()
tomcounsell
  • 4,991
  • 3
  • 34
  • 34
  • 1
    I am also running this same implementation in production with no issues and the best part is that it works with uwsgi without any any major performance issues – Deepak Jun 24 '15 at 08:26
  • Do you limit the number of threads spawned? Is it possible for someone to thread bomb your code? – CadentOrange Feb 09 '16 at 17:21
  • I have not used this in scenarios where more than just a couple threads can be created in a single web request. I have no reason to throw a function that uses this decorator inside a loop. It'd be better to simply wrap that logic into a single function that runs in one additional thread. The goal here is to postpone any processing that is not vital to returning the web request. – tomcounsell Feb 13 '16 at 16:00
  • 9
    Note that this will leak database connections, as django will create a new db connection per thread and you are responsible for closing it. – Chronial Jan 12 '17 at 01:34
  • 2
    Testing confirms that @Chronial is correct. If your function performs a database transaction (read is what I tested) then a new connection is create. Then the thread terminates the connection remains and after 80 Postgres rejects any further connections from being created – oden Jan 15 '17 at 09:30
  • Would it be safe to use this when ur view functions has a couple of `myModel.Objects.create()`? – Algorithmatic Mar 16 '17 at 22:11
  • 2
    When the server shuts down and your postponed function has not run, or is part way through running, it will simply be aborted. You need to call the `join` method of the new thread from the main thread, and there isn't a good way to do that. This is one of the reasons why people use Celery etc. – spookylukey Aug 02 '17 at 16:17
  • 1
    @spookylukey what are the implications if it is simply aborted? – Neil Oct 24 '20 at 17:24
  • 1
    @Neil - that depends entirely on the job and what it does - whether it was important work, whether it uses database transactions or not, whether it will leave something in an inconsistent state, whether you have a mechanism to detect that and retry without issues etc. – spookylukey Oct 27 '20 at 08:03
20

Celery is an asynchronous task queue/job queue. It's well documented and perfect for what you need. I suggest you start here

Glyn Jackson
  • 8,228
  • 4
  • 29
  • 52
3

The most common way to do asynchronous processing in Django is to use Celery and django-celery.

Thomas Orozco
  • 53,284
  • 11
  • 113
  • 116
3

tomcounsell's approach works well if there are not too many incoming jobs. If many long-lasting jobs are run in short period of time, therefore spawning a lot of threads, the main process will suffer. In this case, you can use a thread pool with a coroutine,

# in my_utils.py

from concurrent.futures import ThreadPoolExecutor

MAX_THREADS = 10


def run_thread_pool():
    """
    Note that this is not a normal function, but a coroutine.
    All jobs are enqueued first before executed and there can be
    no more than 10 threads that run at any time point.
    """
    with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
        while True:
            func, args, kwargs = yield
            executor.submit(func, *args, **kwargs)


pool_wrapper = run_thread_pool()

# Advance the coroutine to the first yield (priming)
next(pool_wrapper)
from my_utils import pool_wrapper

def job(*args, **kwargs):
    # do something

def handle(request):
    # make args and kwargs
    pool_wrapper.send((job, args, kwargs))
    # return a response