25

I need to design a Redis-driven scalable task scheduling system.

Requirements:

  • Multiple worker processes.
  • Many tasks, but long periods of idleness are possible.
  • Reasonable timing precision.
  • Minimal resource waste when idle.
  • Should use synchronous Redis API.
  • Should work for Redis 2.4 (i.e. no features from upcoming 2.6).
  • Should not use other means of RPC than Redis.

Pseudo-API: schedule_task(timestamp, task_data). Timestamp is in integer seconds.

Basic idea:

  • Listen for upcoming tasks on list.
  • Put tasks to buckets per timestamp.
  • Sleep until the closest timestamp.
  • If a new task appears with timestamp less than closest one, wake up.
  • Process all upcoming tasks with timestamp ≤ now, in batches (assuming that task execution is fast).
  • Make sure that concurrent worker wouldn't process same tasks. At the same time, make sure that no tasks are lost if we crash while processing them.

So far I can't figure out how to fit this in Redis primitives...

Any clues?

Note that there is a similar old question: Delayed execution / scheduling with Redis? In this new question I introduce more details (most importantly, many workers). So far I was not able to figure out how to apply old answers here — thus, a new question.

Community
  • 1
  • 1
Alexander Gladysh
  • 39,865
  • 32
  • 103
  • 160
  • I'd like to explicitly note that polling of a Redis key in a loop would violate "minimal resource waste when idle" requirement. Workers should sleep when there is nothing to do. – Alexander Gladysh Jun 03 '12 at 07:15
  • polling with BLPOP/BRPOP can block until the list is full, and is what most people use to do this. You usually block for a few seconds in a loop, but in terms of CPU time it's negligible. You can use redis pub/sub but that is bad because if there is no worker, tasks will be lost. – Not_a_Golfer Jun 03 '12 at 10:50
  • @Not_a_Golfer: Things are a little more complicated than BLPOPping a single list. Please note that I need delayed task execution (i.e. task scheduler), not a straightforward task processor. – Alexander Gladysh Jun 03 '12 at 14:23

5 Answers5

14

Here's another solution that builds on a couple of others [1]. It uses the redis WATCH command to remove the race condition without using lua in redis 2.6.

The basic scheme is:

  • Use a redis zset for scheduled tasks and redis queues for ready to run tasks.
  • Have a dispatcher poll the zset and move tasks that are ready to run into the redis queues. You may want more than 1 dispatcher for redundancy but you probably don't need or want many.
  • Have as many workers as you want which do blocking pops on the redis queues.

I haven't tested it :-)

The foo job creator would do:

def schedule_task(queue, data, delay_secs):
    # This calculation for run_at isn't great- it won't deal well with daylight
    # savings changes, leap seconds, and other time anomalies. Improvements
    # welcome :-)
    run_at = time.time() + delay_secs

    # If you're using redis-py's Redis class and not StrictRedis, swap run_at &
    # the dict.
    redis.zadd(SCHEDULED_ZSET_KEY, run_at, {'queue': queue, 'data': data})

schedule_task('foo_queue', foo_data, 60)

The dispatcher(s) would look like:

while working:
    redis.watch(SCHEDULED_ZSET_KEY)
    min_score = 0
    max_score = time.time()
    results = redis.zrangebyscore(
        SCHEDULED_ZSET_KEY, min_score, max_score, start=0, num=1, withscores=False)
    if results is None or len(results) == 0:
        redis.unwatch()
        sleep(1)
    else: # len(results) == 1
        redis.multi()
        redis.rpush(results[0]['queue'], results[0]['data'])
        redis.zrem(SCHEDULED_ZSET_KEY, results[0])
        redis.exec()

The foo worker would look like:

while working:
    task_data = redis.blpop('foo_queue', POP_TIMEOUT)
    if task_data:
        foo(task_data)

[1] This solution is based on not_a_golfer's, one at http://www.saltycrane.com/blog/2011/11/unique-python-redis-based-queue-delay/, and the redis docs for transactions.

Dan Benamy
  • 839
  • 8
  • 14
  • 3
    If it's of any interest to anybody, I've created a Java implementation of the above... fully tested and functional. https://github.com/davidmarquis/redis-scheduler – David M Dec 13 '13 at 21:51
7

You didn't specify the language you're using. You have at least 3 alternatives of doing this without writing a single line of code in Python at least.

  1. Celery has an optional redis broker. http://celeryproject.org/

  2. resque is an extremely popular redis task queue using redis. https://github.com/defunkt/resque

  3. RQ is a simple and small redis based queue that aims to "take the good stuff from celery and resque" and be much simpler to work with. http://python-rq.org/

You can at least look at their design if you can't use them.

But to answer your question - what you want can be done with redis. I've actually written more or less that in the past.

EDIT: As for modeling what you want on redis, this is what I would do:

  1. queuing a task with a timestamp will be done directly by the client - you put the task in a sorted set with the timestamp as the score and the task as the value (see ZADD).

  2. A central dispatcher wakes every N seconds, checks out the first timestamps on this set, and if there are tasks ready for execution, it pushes the task to a "to be executed NOW" list. This can be done with ZREVRANGEBYSCORE on the "waiting" sorted set, getting all items with timestamp<=now, so you get all the ready items at once. pushing is done by RPUSH.

  3. workers use BLPOP on the "to be executed NOW" list, wake when there is something to work on, and do their thing. This is safe since redis is single threaded, and no 2 workers will ever take the same task.

  4. once finished, the workers put the result back in a response queue, which is checked by the dispatcher or another thread. You can add a "pending" bucket to avoid failures or something like that.

so the code will look something like this (this is just pseudo code):

client:

ZADD "new_tasks" <TIMESTAMP> <TASK_INFO>

dispatcher:

while working:
   tasks = ZREVRANGEBYSCORE "new_tasks" <NOW> 0 #this will only take tasks with timestamp lower/equal than now
   for task in tasks:

       #do the delete and queue as a transaction
       MULTI
       RPUSH "to_be_executed" task
       ZREM "new_tasks" task
       EXEC

   sleep(1)

I didn't add the response queue handling, but it's more or less like the worker:

worker:

while working:
   task = BLPOP "to_be_executed" <TIMEOUT>
   if task:
      response = work_on_task(task)
      RPUSH "results" response

EDit: stateless atomic dispatcher :

while working:

   MULTI
   ZREVRANGE "new_tasks" 0 1
   ZREMRANGEBYRANK "new_tasks" 0 1
   task = EXEC

   #this is the only risky place - you can solve it by using Lua internall in 2.6
   SADD "tmp" task

   if task.timestamp <= now:
       MULTI
       RPUSH "to_be_executed" task
       SREM "tmp" task
       EXEC
   else:

       MULTI
       ZADD "new_tasks" task.timestamp task
       SREM "tmp" task
       EXEC

   sleep(RESOLUTION)
Not_a_Golfer
  • 47,012
  • 14
  • 126
  • 92
  • interesting! you want to actually use lua inside redis? or just connect to redis with lua? the first won't work. – Not_a_Golfer Jun 03 '12 at 14:00
  • Thank you for the references, I'll check them out. Since you written something similar on the past, do you care to elaborate a bit on the key points of design? Nevermind the language. – Alexander Gladysh Jun 03 '12 at 14:02
  • My scheduler and tasks would be written in Lua. Redis 2.4 does not have Lua scripting. – Alexander Gladysh Jun 03 '12 at 14:04
  • I have not found any information on how to schedule a task execution in resque at a given timestamp. Did I miss something? – Alexander Gladysh Jun 03 '12 at 14:23
  • I've edited my response and added a basic design. BTW what I did was a bit quick-n-dirty - I just delayed tasks in the workers by spawning new threads. – Not_a_Golfer Jun 03 '12 at 14:32
  • Celery have celerybeat for scheduling. But celerybeat does not use Redis, instead it runs a separate process with a loop, sleeping between tasks. This is far from what I have in mind. – Alexander Gladysh Jun 03 '12 at 14:33
  • As for an edit: thank you, but you skipped the cleanup phase — that's what is most interesting here. Assume that there are many schedulers, all polling "new_tasks" and you have to prevent the tasks to be executed (or RPUSHed) twice. – Alexander Gladysh Jun 03 '12 at 14:36
  • RQ also does not appear to have any scheduling capabilities. – Alexander Gladysh Jun 03 '12 at 14:39
  • If you need many schedulers you can add a distributed lock when entering this phase (using SETNX and EXPIRE, see https://github.com/ServiceStack/ServiceStack.Redis/wiki/RedisLocks ). this will let the "winner" scheduler take care of cleaning up easily before dispatching. But I would not bother doing it, in terms of speed a single dispatcher can work on tens if not hundreds of thousands of tasks per second. and if it restarts, nothing is lost (you can delete from the sorted set per each dispatching, although it's a bit slower). – Not_a_Golfer Jun 03 '12 at 14:40
  • For speed I would just distribute the load between many schedulers, each with its own queue. For HA I would just make the single scheduler very error resilient and transaction oriented, and put a watchdog to bring it up back if it fails. – Not_a_Golfer Jun 03 '12 at 14:43
  • I would like to avoid using Redis locks if possible. As for why do I need many schedulers — it is not speed, it is reliability that I'm worried about. Otherwise I would not be able to e.g. reboot under load. – Alexander Gladysh Jun 03 '12 at 14:44
  • You can do the delete/dispatch as a transaction (changed the example accordingly) so if the dispatcher is rebooted, the exact state is saved in redis and you can continue from where you left. – Not_a_Golfer Jun 03 '12 at 14:48
  • I do not want to pause processing while machine reboots, that is the point. Note that the transaction that you added would not work in the general case — it is likely that the "to_be_executed" and "new_tasks" are on different Redis servers. – Alexander Gladysh Jun 03 '12 at 14:50
  • then why not locks? anyway, there are several ways of achieving this, another example: a. you can have the dispatcher atomically (in a transaction) get the latest item, put it in a side bucket, and delete it from the queue. this assures no other dispatcher will take it. then the dispatcher can examine it, see if it needs execution or not, and if not return it to "new_tasks" - and of course remove it transactionally from the side bucket. then you can have a watchdog monitor the side bucket for "zombie" tasks and queue them back for execution. – Not_a_Golfer Jun 03 '12 at 14:55
  • Actually - writing the dispatcher's call in Lua on redis2.6 will ensure all this will happen atomically and the dispatchers will be totally stateless! – Not_a_Golfer Jun 03 '12 at 14:55
  • another idea - use a list of new tasks, and then each dispatcher can move tasks to a private "inspection queue" with BRPOPLPUSH. – Not_a_Golfer Jun 03 '12 at 15:10
  • No locks — due to problem with lock expiration. If timeout is too small there is a risk that work is not completed when lock is expired (for example due to network connectivity problems). If too large, then service will be disrupted for too long when lock owner dies. – Alexander Gladysh Jun 03 '12 at 15:29
  • Notes to your new solution: it is probably better to use `ZRANGEBYSCORE "new_tasks -inf LIMIT 1`. Same for `ZREMRANGEBYRANK`. And if you swap `ZREMRANGEBYRANK` and `SADD`, it probably would be safer. And, again, you skipped the most interesting place — what to do with `tmp`? – Alexander Gladysh Jun 03 '12 at 15:33
  • tmp should be used in the case that a dispatcher died before dispatching the task. you'll have to use a separate thread to poll it and see if there are zombie processes. But I have to say the BRPOPLPUSH approach is much more elegant. it's even mentioned as a solution for reliable queuing in the redis documentation. – Not_a_Golfer Jun 03 '12 at 15:43
  • Did anyone ever use this solution in anger? I'd love to see source code, if so. – crowder May 09 '14 at 22:13
3

If you're looking for ready solution on Java. Redisson is right for you. It allows to schedule and execute tasks (with cron-expression support) in distributed way on Redisson nodes using familiar ScheduledExecutorService api and based on Redis queue.

Here is an example. First define a task using java.lang.Runnable interface. Each task can access to Redis instance via injected RedissonClient object.

public class RunnableTask implements Runnable {

    @RInject
    private RedissonClient redissonClient;

    @Override
    public void run() throws Exception {
        RMap<String, Integer> map = redissonClient.getMap("myMap");
        Long result = 0;
        for (Integer value : map.values()) {
            result += value;
        }
        redissonClient.getTopic("myMapTopic").publish(result);
    }

}

Now it's ready to sumbit it into ScheduledExecutorService:

RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");
ScheduledFuture<?> future = executorService.schedule(new CallableTask(), 10, 20, TimeUnit.MINUTES);

future.get();
// or cancel it
future.cancel(true);

Examples with cron expressions:

executorService.schedule(new RunnableTask(), CronSchedule.of("10 0/5 * * * ?"));

executorService.schedule(new RunnableTask(), CronSchedule.dailyAtHourAndMinute(10, 5));

executorService.schedule(new RunnableTask(), CronSchedule.weeklyOnDayAndHourAndMinute(12, 4, Calendar.MONDAY, Calendar.FRIDAY));

All tasks are executed on Redisson node.

Nikita Koksharov
  • 10,283
  • 1
  • 62
  • 71
0

A combined approach seems plausible:

  1. No new task timestamp may be less than current time (clamp if less). Assuming reliable NTP synch.

  2. All tasks go to bucket-lists at keys, suffixed with task timestamp.

  3. Additionally, all task timestamps go to a dedicated zset (key and score — timestamp itself).

  4. New tasks are accepted from clients via separate Redis list.

  5. Loop: Fetch oldest N expired timestamps via zrangebyscore ... limit.

  6. BLPOP with timeout on new tasks list and lists for fetched timestamps.

  7. If got an old task, process it. If new — add to bucket and zset.

  8. Check if processed buckets are empty. If so — delete list and entrt from zset. Probably do not check very recently expired buckets, to safeguard against time synchronization issues. End loop.

Critique? Comments? Alternatives?

Alexander Gladysh
  • 39,865
  • 32
  • 103
  • 160
0

Lua

I made something similar to what's been suggested here, but optimized the sleep duration to be more precise. This solution is good if you have few inserts into the delayed task queue. Here's how I did it with a Lua script:

local laterChannel = KEYS[1]
local nowChannel = KEYS[2]
local currentTime = tonumber(KEYS[3])

local first = redis.call("zrange", laterChannel, 0, 0, "WITHSCORES")

if (#first ~= 2)
then
    return "2147483647"
end

local execTime = tonumber(first[2])
local event = first[1]

if (currentTime >= execTime)
then
    redis.call("zrem", laterChannel, event)
    redis.call("rpush", nowChannel, event)
    return "0"
else
    return tostring(execTime - currentTime)
end

It uses two "channels". laterChannel is a ZSET and nowChannel is a LIST. Whenever it's time to execute a task, the event is moved from the the ZSET to the LIST. The Lua script with respond with how many MS the dispatcher should sleep until the next poll. If the ZSET is empty, sleep forever. If it's time to execute something, do not sleep(i e poll again immediately). Otherwise, sleep until it's time to execute the next task.

So what if something is added while the dispatcher is sleeping?

This solution works in conjunction with key space events. You basically need to subscribe to the key of laterChannel and whenever there is an add event, you wake up all the dispatcher so they can poll again.

Then you have another dispatcher that uses the blocking left pop on nowChannel. This means:

  • You can have the dispatcher across multiple instances(i e it's scaling)
  • The polling is atomic so you won't have any race conditions or double events
  • The task is executed by any of the instances that are free

There are ways to optimize this even more. For example, instead of returning "0", you fetch the next item from the zset and return the correct amount of time to sleep directly.

Expiration

If you can not use Lua scripts, you can use key space events on expired documents. Subscribe to the channel and receive the event when Redis evicts it. Then, grab a lock. The first instance to do so will move it to a list(the "execute now" channel). Then you don't have to worry about sleeps and polling. Redis will tell you when it's time to execute something.

execute_later(timestamp, eventId, event) {
    SET eventId event EXP timestamp
    SET "lock:" + eventId, ""
}

subscribeToEvictions(eventId) {
    var deletedCount = DEL eventId
    if (deletedCount == 1) {
        // move to list
    }
}

This however has it own downsides. For example, if you have many nodes, all of them will receive the event and try to get the lock. But I still think it's overall less requests any anything suggested here.

Coder1337
  • 21
  • 4