13

Is it possible to dynamically add periodic tasks to celery?

I'm using Flask, not django, and I am building an app that is supposed to allow users to define recurrent tasks through a web interface.

I've tried using Periodic Tasks from Celery 4.1, but to add new tasks I have to stop celery server, change the config (even if done through python), and start it again. Maybe there's a way to dynamically load the config (without having to re-start it)?

I've considered to have a crontab that re-starts celery service every 5mins. but it seems highly contra-nature. Among other reasons because the reason I wanted to use celery was not to use crontab.

Does anyone has some lights on this?

ps.: I'm aware of another similar question, but it's from 2012. I was hoping things had changed since then, namely with the introduction of beat in v4.1

ggaspar
  • 315
  • 2
  • 11
  • Assuming you're talking about adding new task implementations to existing running workers, what is the reason behind this? You should be using a broker to send jobs to the workers, not manually restarting the workers – Jason Dec 06 '17 at 10:55
  • an user of my application should be able to load a script and define a schedule for it's execution. Such task is not to be executed only once (which is the scenario you describe, IIUC), but, for example, every 5 minutes. – ggaspar Dec 06 '17 at 11:02
  • manually restarting the workers is something I really want to avoid – ggaspar Dec 06 '17 at 11:07
  • it doesn't need to be a new task implementation. I can easily define a generic, parameterizable task, that would call whatever function it receives as parameter – ggaspar Dec 06 '17 at 11:15
  • and now that I think about it, maybe the best solution will be to pre-define some generic periodic tasks that will retrieve from a DB each function that must be executed at that given beat. but I would still like to know if it's possible to add periodic tasks at runtime. – ggaspar Dec 06 '17 at 11:17
  • looks like it has been fixed https://github.com/celery/celery/pull/3958 – Jason Dec 06 '17 at 13:00
  • Thanks! yes, that's exactly what I need. but will only be available in v4.1.1. ATM I prefer to use the release version, 4.1, so I'll implement the solution I wrote above: I pre-define few "meta tasks", with few relevant periods (every 1s, 1m, 5m, 1h, 1d). And each task will query a DB to know which functions to execute. – ggaspar Dec 06 '17 at 13:38
  • Sounds workable in the meantime. Hopefully the point release will be coming out soon. – Jason Dec 06 '17 at 14:00

2 Answers2

4

This works for Celery 4.0.1+ and Python 2.7, and Redis

from celery import Celery
import os, logging
logger = logging.getLogger(__name__)
current_module = __import__(__name__)

CELERY_CONFIG = {
    'CELERY_BROKER_URL': 
     'redis://{}/0'.format(os.environ.get('REDIS_URL', 'localhost:6379')),
  'CELERY_TASK_SERIALIZER': 'json',
}


celery = Celery(__name__, broker=CELERY_CONFIG['CELERY_BROKER_URL'])
celery.conf.update(CELERY_CONFIG)

I define a job in the following way:

job = {
    'task': 'my_function',               # Name of a predefined function
    'schedule': {'minute': 0, 'hour': 0} # crontab schedule
    'args': [2, 3],
    'kwargs': {}
}

I then define a decorator like this:

def add_to_module(f):
    setattr(current_module, 'tasks_{}__'.format(f.name), f)
    return f

My task is

@add_to_module
def my_function(x, y, **kwargs):
    return x + y

Then add a function which adds the task on the fly

def add_task(job):
    logger.info("Adding periodic job: %s", job)
    if not isinstance(job, dict) and 'task' in jobs:
        logger.error("Job {} is ill-formed".format(job))
        return False
    celery.add_periodic_task(
        crontab(**job.get('schedule', {'minute': 0, 'hour': 0})),
        get_from_module(job['task']).s(
            enterprise_id,
            *job.get('args', []),
            **job.get('kwargs', {})
        ),
        name = job.get('name'),
        expires = job.get('expires')
    )
    return True


def get_from_module(f):
    return getattr(current_module, 'tasks_{}__'.format(f))

After this, you can link the add_task function to a URL, and get them to create tasks out of functions in your current module

ananth krishnan
  • 249
  • 2
  • 5
4

for this purpose you can user redBeat. based on redbeat GitHub:

RedBeat is a Celery Beat Scheduler that stores the scheduled tasks and runtime metadata in Redis.

for task creation:

import tasks #celery defined task class 
from redbeat import RedBeatSchedulerEntry as Entry
entry = Entry(f'urlCheck_{key}', 'tasks.urlSpeed', repeat, args=['GET', url, timeout, key], app=tasks.app)
entry.save()
entry.key

delete task:

import tasks #celery defined task class 
from redbeat import RedBeatSchedulerEntry as Entry
entry = Entry.from_key(key, app=tasks.app) #key from previous step
entry.delete()

and there is an example that you can use: https://github.com/hamedsh/redBeat_example

hamSh
  • 1,163
  • 1
  • 13
  • 27