20

Our Current Design

Env Redis 2.8.17

We have implemented our reliable queue, using the pattern similar to the one described in redis documentation, under RPOPLPUSH

However, we are using BRPOPLPUSH considering its blocking nature, and LPUSH for ensuring the FIFO order.

Producers: multiple threads(from multiple servers) using LPUSH to push the items.

Consumers: multiple threads(from multiple servers) using BRPOPLPUSH to process the items.

BRPOPLPUSH q processing-q

As documented, redis pops the item from queue 'q', while adding them in 'processing-q'.

Problem

Owing to the multi-threaded(async) nature of our application, we don't have any control over, when the consumers will be completing their processing.

So, if we use LREM(as per documentation) to remove the processed element from processing-q, this will only remove the top element of the processing-q. Where as it has no guarantee, on whether it has removed the actual element, which was processed by the respective consumer.

So if we don't do anything the processing-q keeps on growing(eating-up memory), which is very bad IMHO.

Any suggestions or ideas ?

aspdeepak
  • 2,640
  • 2
  • 32
  • 37
  • 1
    Here's a good presentation titled [Redis as a Reliable Work Queue](https://www.percona.com/news-and-events/percona-university-smart-data-raleigh/using-redis-reliable-work-queue) and [presenter's blog post](http://blog.bronto.com/engineering/reliable-queueing-in-redis-part-1/) on the same subject. It explains a more advanced queue design modelled in Redis than one that is in RPOPLPUSH docs. Failure recovery activity is also mentioned. – saaj Dec 31 '15 at 11:20

4 Answers4

27

You just need to include the job you want to delete in your call to LREM.

LREM takes the form:

LREM queue count "object"

It will remove count items equal to "object" from queue. So to remove the specific job your consumer thread is working on you'd do something like this.

LREM processing-q 1 "job_identifier"

For more see the documentation here: http://redis.io/commands/lrem

Then to handle crashed consumers and abandoned jobs you can use SETEX to create locks with an expiration and periodically check for jobs without locks.

So the whole process looks like this:

Producer

  1. RPUSH q "job_identifier"

Consumer

  1. SETEX lock:processing-q:job_identifier 60 (Set lock first to avoid race condition)
  2. BRPOPLPUSH q processing-queue
  3. Process job
  4. LREM processing-queue "job_identifier"

Expired Jobs Monitor

  1. jobs = LRANGE processing-queue 0 -1
  2. foreach job in jobs : lock = GET lock:processing-q:job_identifier
  3. if lock is null this job timed out, so remove from processing-q LREM processing-queue "job_identifier"
  4. and retry with RPUSH q "job_identifier"

@NotAUser has published an open source java implementation, here: https://github.com/graknlabs/redisq/tree/master/src/main/java/ai/grakn/redisq

Loren_
  • 2,597
  • 3
  • 20
  • 29
  • 1
    This looks easier and better to implement than suggested accepted answer. – lucaswxp Nov 02 '16 at 22:04
  • 1
    In case this helps someone I made a quick and dirty implementation of that technique in Go: https://gist.github.com/brunocassol/ee97631ce2126e7b0f36d771a028d104 – Bruno Cassol Mar 11 '17 at 08:08
  • Is not missing a fourth step in Expired Jobs Monitor for removing the task from the processing-queue, much like the fourth step found in Consumer? – utxeee Jul 10 '17 at 10:16
  • This doesn't work unfortunately. There's a race condition between BRPOPLPUSH and SETEX so the expired job monitor might retry objects even if they have just started – NotAUser Aug 19 '17 at 11:10
  • @NotAUser - If you SETEX before BRPOPLPUSH, does that clear up your race condition? – Loren_ Aug 24 '17 at 17:59
  • 1
    Yes! That's what I ended up doing thanks for the follow up. Btw I ended up implementing this: https://github.com/graknlabs/redisq/tree/master/src/main/java/ai/grakn/redisq – NotAUser Aug 25 '17 at 10:15
  • 4
    I guess I miss something obvious but how can you set a lock using job ID before getting job ID? – user0103 May 28 '18 at 15:35
  • @user0103 "job_identifier" is some unique id your system uses for the job. In our case it's frequently an orderId, generated random number, or sometimes even a primary key from our database. – Loren_ Sep 18 '18 at 19:13
  • 1
    In Expired Jobs Monitor shouldn't step 4 happen before 3? It's better to have task received by consumer more than once than ever. – Tom Raganowicz Jan 12 '19 at 17:09
  • I am facing same trouble as @user0103 Can't set the lock in Consumer step 1), as I don't have the job_identifier at this stage yet. – Tom Raganowicz Jan 14 '19 at 07:04
  • @NeverEndingQueue you have to generate it by yourself – user0103 Jan 25 '19 at 23:48
  • @user0103 How can I generate something that relates to something that I've never seen? – Tom Raganowicz Jan 26 '19 at 06:38
  • @NeverEndingQueue that's not the point, I thought the same way as you initially. RPUSH essentially just pushes _value_ in the list. This _value_ should represent your job. Basically you can put there something like this `{jobId: GENERATED_ID, data: {}}`. Then you use the same GENERATED_ID to call `SETEX`. And then, when you want to check that job is alive, you iterate `processing-queue`, extract ID from value, and check that lock exists. Redis doesn't know anything about a 'job', you encode your 'job' object as a string or something, so you decide how you can identify this job in the future. – user0103 Jan 26 '19 at 09:53
  • 1
    I am not sure if I am dumb now, but I still can't get the idea. Producer using `RPUSH` adds the string with `GENERATED_ID` and job content. Then consumer runs the `SETEX` using either the full job string or just `GENERATED_ID` (by producer), but how consumer can issue that command before polling the `q` first? Is there any other communication between `producer` and `consumer` outside of this queue mechanism? – Tom Raganowicz Jan 27 '19 at 11:25
12

The approach I would take is to use a per-consumer processing-q (e.g. processing-q:consumer-id). That would solve your current problem but you'd still need to handle crashed consumers somehow. For that, I suggest you also keep the last time that each consumer popped a task and periodically check for timeouts. If a consumer has reached the timeout, move its task back to the main queue and delete its queue.

Itamar Haber
  • 47,336
  • 7
  • 91
  • 117
4

In a similar project, I'm using the hostname and the process id of the worker for the backup queues. Each worker has its own backup queue, and if the worker dies, the item is not lost.

Check the README and the implementation for more details.

soveran
  • 872
  • 6
  • 8
  • 3
    @soveran, so what happens when a worker dies in the middle of processing and restarted with a different process ID? How does the new work process look up its predecessor's the backup queue? – skyork Jun 15 '15 at 20:02
0

In addition to the proposed solutions you could also ltrim the processing queue to an amount that makes sense for your service. This would ensure the processing queue never grows out of proportion.

But you'll start losing items if the trim limit is hit. This may or may not be acceptable for your use case.

http://redis.io/commands/ltrim