14

I am very new to Celery and here is the question I have:

Suppose I have a script that is constantly supposed to fetch new data from DB and send it to workers using Celery.

tasks.py

# Celery Task
from celery import Celery

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task
def process_data(x):
    # Do something with x
    pass

fetch_db.py

# Fetch new data from DB and dispatch to workers.
from tasks import process_data

while True:
    # Run DB query here to fetch new data from DB fetched_data

    process_data.delay(fetched_data)

    sleep(30);

Here is my concern: the data is being fetched every 30 seconds. process_data() function could take much longer and depending on the amount of workers (especially if too few) the queue might get throttled as I understand.

  1. I cannot increase number of workers.
  2. I can modify the code to refrain from feeding the queue when it is full.

The question is how do I set queue size and how do I know it is full? In general, how to deal with this situation?

jazzblue
  • 2,411
  • 4
  • 38
  • 63
  • Add more workers to catch up with the queue – noorul Feb 05 '16 at 23:35
  • @noorul: this was not my question. I can't add more workers. I can only refrain from feeding the queue if it is full. My question is how do I set its size and how do I know it is full. – jazzblue Feb 05 '16 at 23:37
  • 1
    I am not sure how we can define the fullness of a Celery queue. May be you can put another queue in front of Celery queue and control that. – noorul Feb 06 '16 at 00:15
  • the queue will not get throttled unless you configure it. however it would say that it looks like you need to rethink your design. it's hard to say how, since the code you give is too generic. – scytale Feb 08 '16 at 14:32
  • Can you change process_data so it records a status somewhere, so you can check how many of them are still running? – RemcoGerlich Feb 10 '16 at 08:10

1 Answers1

10

You can set rabbitmq x-max-length in queue predeclare using kombu

example :

import time
from celery import Celery
from kombu import Queue, Exchange

class Config(object):
    BROKER_URL = "amqp://guest@localhost//"

    CELERY_QUEUES = (
        Queue(
            'important',
            exchange=Exchange('important'),
            routing_key="important",
            queue_arguments={'x-max-length': 10}
        ),
    )

app = Celery('tasks')
app.config_from_object(Config)


@app.task(queue='important')
def process_data(x):
    pass

or using Policies

rabbitmqctl set_policy Ten "^one-meg$" '{"max-length-bytes":1000000}' --apply-to queues
faisal burhanudin
  • 1,101
  • 12
  • 16
  • 1
    What happens with the .delay() call in this case? – RemcoGerlich Feb 10 '16 at 08:08
  • task will be added to the queue, but if queue limit reached `Messages will be dropped or dead-lettered from the front of the queue to make room for new messages once the limit is reached.`[rabbit](https://www.rabbitmq.com/maxlength.html). By issue in [github](https://github.com/rabbitmq/rabbitmq-server/issues/499) Block publishers when queue length limit is reached is imposible right now – faisal burhanudin Feb 10 '16 at 08:18