3

I am trying to limit the rate of one celery task. Here is how I am doing it:

from project.celery import app

app.control.rate_limit('task_a', '10/m')

It is working well. However, there is a catch. Other tasks that this worker is responsible for are being blocked as well.

Let's say, 100 of task_a have been scheduled. As it is rate-limited, it will take 10 minutes to execute all of them. During this time, task_b has been scheduled as well. It will not be executed until task_a is done.

Is it possible to not block task_b?

By the looks of it, this is just how it works. I just didn't get that impression after reading the documentation.

Other options include:

  • Separate worker and queue only for this task
  • Adding an eta to the task task_a so that all of it are scheduled to run during the night

What is the best practice in such cases?

Jahongir Rahmonov
  • 13,083
  • 10
  • 47
  • 91

2 Answers2

1

The easiest (no coding required) way is separating the task into its own queue and running a dedicated worker just for this purpose.

There's no shame in that, it is totally fine to have many Celery queues and workers, each dedicated just for a specific type of work. As an added bonus you may get some more control over the execution, you can easily turn workers ON/OFF to pause certain processes if needed, etc.

On the other hand, having lots of specialized workers idle most of the time (waiting for a specific job to be queued) is not particularly memory-efficient.

Thus, in case you need to rate limit more tasks and expect the specific workers to be idle most of the time, you may consider increasing the efficiency and implement a Token Bucket. With that all your workers can be generic-purpose and you can scale them naturally as your overall load increases, knowing that the work distribution will not be crippled by a single task's rate limit anymore.

knaperek
  • 2,113
  • 24
  • 39
0

This should be part of a task declaration to work on per-task basis. The way you are doing it via control probably why it has this side-effect on other tasks

@task(rate_limit='10/m')
def task_a():
    ...

After more reading

Note that this is a per worker instance rate limit, and not a global rate limit. To enforce a global rate limit (e.g., for an API with a maximum number of requests per second), you must restrict to a given queue.

You probably will have to do this in separate queue

Sardorbek Imomaliev
  • 14,861
  • 2
  • 51
  • 63