15

The scenario (I've simplified things):

  • Many end users can start jobs (heavy jobs, like rendering a big PDF for example), from a front end web application (producer).
  • The jobs are sent to a single durable RabbitMQ queue.
  • Many worker applications (consumers) processes those jobs and write the results back in a datastore.

This fairly standard pattern is working fine.

The problem: if a user starts 10 jobs in the same minute, and only 10 worker applications are up at that time of day, this end user is effectively taking over all the compute time for himself.

The question: How can I make sure only one job per end user is processed at any time ? (Bonus: some end users (admins for example) must not be throttled)

Also, I do not want the front end application to block end users from starting concurrent jobs. I just want the end users to wait for their concurrent jobs to finish one at a time.

The solution?: Should I dynamically create one auto-delete exclusive queue per end users ? If yes, how can I tell the worker applications to start consuming this queue ? How to ensure one (and only one) worker will consume from this queue ?

Pierre-David Belanger
  • 1,004
  • 1
  • 11
  • 19
  • Make one queue by one worker. So you can calc somthing like: userid % workercount, add routing like http://www.rabbitmq.com/tutorials/tutorial-five-dotnet.html. So in one time only one task from one user can be processed. – gabba Feb 09 '15 at 22:00
  • I see three problems with this approach: 1) the number of worker must be relatively static for this algorithm to work properly, 2) producers needs to know in real time how many consumers are up, 3) work load may not be fairly distributed amongst the workers if some users start more jobs than others. Thank you, but I was hoping someone to helps me understand how I can setup my RabbitMQ queues and exchanges to achieve this (is its possible at all :)). – Pierre-David Belanger Feb 10 '15 at 00:01
  • If I understand you correctly, you need something that automatically creates and terminates the workers and evenly distributing tasks. Try to add dispatcher node or nodes depends on how many task you have. Dispatchers will add queue for user and when worker ends all user tasks it send message to dispatcher to remove queue. One worker could process multiple user queues but in your case only one worker can process one user (except admin). – gabba Feb 10 '15 at 07:39
  • Use redis or zookeeper to control the number of concurrent users being processed – Robinho Jan 19 '16 at 22:55
  • Robinho, care to expand your comment into an answer? – WW. Oct 13 '16 at 10:21
  • @WW. , this is a little meta, but does the fact that I accepted an answer invalidated your bounty ? I do not know how to behave in this situation. Maybe I can un-accept Dimos 's answer, and wait until YOU are satisfied with an answer ? – Pierre-David Belanger Oct 13 '16 at 13:15
  • It's your question, accept the answer you like best. – WW. Oct 13 '16 at 20:54

2 Answers2

7

You would need to build something yourself to implement this as Dimos says. Here is an alternative implementation which requires an extra queue and some persistent storage.

  • As well as the existing queue for jobs, create a "processable job queue". Only jobs that satisfy your business rules are added to this queue.
  • Create a consumer (named "Limiter") for the job queue. The Limiter also needs persistent storage (e.g. Redis or a relational database) to record which jobs are currently processing. The limiter reads from the job queue and writes to the processable job queue.
  • When a worker application finishes processing a job, it adds a "job finished" event to the job queue.

    ------------     ------------     ----------- 
    | Producer | -> () job queue ) -> | Limiter | 
    ------------     ------------     ----------- 
                         ^                |                    
                         |                V                    
                         |     ------------------------       
                         |    () processable job queue )  
           job finished  |     ------------------------       
                         |                |
                         |                V
                         |     ------------------------
                         \-----| Job Processors (x10) |
                               ------------------------
    

The logic for the limiter is as follows:

  • When a job message is received, check the persistent storage to see if a job is already running for the current user:
    • If not, record the job in the storage as running and add the job message to the processable job queue.
    • If an existing job is running, record the job in the storage as a pending job.
    • If the job is for an admin user, always add it to the processable job queue.
  • When a "job finished" message is received, remove that job from the "running jobs" list in the persistent storage. Then check the storage for a pending job for that user:
    • If a job is found, change the status of that job from pending to running and add it to the processable job queue.
    • Otherwise, do nothing.
  • Only one instance of the limiter process can run at a time. This could be achieved either by only starting a single instance of the limiter process, or by using locking mechanisms in the persistent storage.

It's fairly heavyweight, but you can always inspect the persistent storage if you need to see what's going on.

Nathan
  • 1,418
  • 16
  • 32
  • Thank you for this answer. This `Limiter` concept with a persistent storage feels indeed heavyweight. Also I do not like the fact that this process will somehow be a single point of failure. – Pierre-David Belanger Oct 13 '16 at 12:44
  • 1
    You could start multiple instances of the Limiter process across multiple servers and rely on the locking mechanism in the persistent storage to ensure that only one Limiter can process at a time. That way, if one server failed, the other Limiter processes would continue processing. – Nathan Oct 13 '16 at 12:50
  • Indeed, sorry, you are right. Only one limiter process can run at a time, but many can be started. – Pierre-David Belanger Oct 13 '16 at 13:00
4

Such a feature is not provided natively by rabbitMQ. However, you could implement it in the following way. You will have to use polling though, which is not so efficient (compared to subscribing/publishing). You will also have to leverage Zookeeper for the coordination between the different workers.

You will create 2 queues: 1 high-priority queue (for the admin jobs) and 1 low-priority queue (for the normal user jobs). The 10 workers will be retrieving messages from both queues. Each worker will execute an infinite loop (with intervals of sleep ideally, when queues are empty), where it will attempt to retrieve a message from each queue interchangeably :

  • For the high-priority queue, the worker just retrieves a message, processes it and acknowledges to the queue.
  • For the low-priority queue, the worker attempts to hold a lock in Zookeeper (by writing to a specific file-znode), and if successful, then reads a message, processes it and acknowledges. If the zookeeper write was unsuccessful, someone else holds the lock, so this worker skips this step and repeats the loop.
Dimos
  • 8,330
  • 1
  • 38
  • 37
  • So there is a Zookeeper lock for each end user that might upload a file? If the first message on the low-priority queue is for a user already processing something, then no worker will be able to process any of the messages on that queue? Or am I misunderstanding? – WW. Oct 12 '16 at 23:14
  • My question is a couple months old now. I have now switched to SQS instead of RabbitMQ. But my original question remains, I still have this problem with my SQS based implementation. I will probably try to implement what you are proposing in this answer. Since I already use polling SQS, this part is ok. But I do not want to add an other component to my stack, so I will not use Zookeeper, but Redis (I already use it for caching and counters). Anyways, thank you for this answer. I accept it. – Pierre-David Belanger Oct 13 '16 at 12:57
  • @WW. there is a single (global) lock for all the messages of the low-priority queue. Each of the 10 worker processes should first acquire this lock, before reading a message from the queue. Furthermore, every worker must release the lock after having processed the message. Essentially, the read-process-ack processing of a message is being converted into a critical section in this way, so at most one worker can execute it at all times. – Dimos Oct 14 '16 at 18:51
  • @Pierre-DavidBelange, as an afterthough, polling is not necessary. You can implement it with pub/sub, but the queue will have to notify all the workers for each new member and only the first to acquire the lock will consume it. Redis can also be an equal alternative to Zookeeper, since you can use optimistic [locking & watches](http://redis.io/topics/transactions#optimistic-locking-using-check-and-set), to create a highly scalable solution. – Dimos Oct 14 '16 at 18:56