I'm working on a new monitoring system that can measure Celery queue throughput and help alert the team when the queue is getting backed up. Over the course of my work, I've come across some peculiar behaviors that I don't understand (and are not well documented in the Celery specs).
For testing purposes, I've set up an endpoint that will populate the queue with 16 several long-running tasks that can be used to simulate a backed-up queue. The framework is Flask and the Queue broker is Redis. Celery is configured for each worker to work on up to 4 tasks in parallel, and I have 2 workers running.
api/health.py
def health():
health = Blueprint("health", __name__)
@health.route("/api/debug/create-long-queue", methods=["GET"])
def long_queue():
for i in range(16):
sleepy_job.delay()
return make_response({}, 200)
return health
jobs.py
@celery.task(priority=HIGH_PRIORITY)
def sleepy_job(*args, **kwargs):
time.sleep(30)
Here's what I do to simulate a backed-up production queue:
- I call
/api/debug/create-long-queue
to simulate a back-up in my queue. Based on the above math, the workers should be busy sleeping for 1 minute each (Together, they can concurrently handle 8 tasks at a time. Each task just sleeps for 30 seconds, and there are 16 tasks total.) - I make another API call shortly after (< 5 s), which kicks of a different job with real business logic (processing of an inbound webhook API call). We'll call this job
handle_incoming_message
.
Here's what I see Using flower to inspect the queue:
- While all workers are blocked by the first 8
sleepy_job
tasks, I see no sign of the newhandle_incoming_message
on the queue, even though I am certainhandle_incoming_message.delay()
has been called as a result of the 2nd API call. - After the first 8
sleepy_job
tasks have been completed (~30s), I see the newhandle_incoming_message
on the queue with stateRECIEVED
. - After the second (and final) 8
sleepy_job
tasks have been completed, I now seehandle_incoming_message
has stateSTARTED
(and I can confirm this as the UI updates with the new data that was received and processed in that task.)
Questions
So it seems clear that when the workers are momentarily unblocked after handling the first 8 sleepy_job
tasks, they are doing something to mark/acknowledge the new handle_incoming_message
task in a way that is visible to flower. But this leaves several unanswered questions:
- What is the state of the new
handle_incoming_message
task when the workers are blocked? - What changes after workers are unblocked that makes it so flower now has visibility into the new
handle_incoming_message
task? - What does the "RECEIVED" state actually mean?
- (Bonus: How can I get visibility into tasks that are queued while workers are blocked?)