93

In my /etc/defaults/celeryd config file, I've set:

CELERYD_NODES="agent1 agent2 agent3 agent4 agent5 agent6 agent7 agent8"
CELERYD_OPTS="--autoscale=10,3 --concurrency=5"

I understand that the daemon spawns 8 celery workers, but I'm fully not sure what autoscale and concurrency do together. I thought that concurrency was a way to specify the max number of threads that a worker can use and autoscale was a way for the worker to scale up and down child workers, if necessary.

The tasks have a largish payload (some 20-50kB) and there are like 2-3 million such tasks, but each task runs in less than a second. I'm seeing memory usage spike up because the broker distributes the tasks to every worker, thus replicating the payload multiple times.

I think the issue is in the config and that the combination of workers + concurrency + autoscaling is excessive and I would like to get a better understanding of what these three options do.

Joseph
  • 933
  • 1
  • 6
  • 4
  • 1
    the documentation for [autoscale](http://celery.readthedocs.org/en/latest/userguide/workers.html#autoscaling) and [concurrency](http://celery.readthedocs.org/en/latest/userguide/workers.html#concurrency) is pretty clear. What bits don't you understand. In particular it doesn't really make sense to specify both at the same time. And what exactly is your problem? The memory spike? Is this actually a problem - i.e. are you hitting swap, or seeing OOM invoked? – scytale Aug 10 '15 at 10:15
  • 4
    @scytale I'm seeing OOM invoked. Lots of processes are simply terminated with `Killed` when it spikes up. I think I'm clear on the autoscale vs. concurrency now. I thought that `--autoscale` would add more workers, but it's simply a dynamic setting for specifying concurrency instead of a fixed setting with `--concurrency`. I guess my only remaining confusion is surrounding "add more workers with less concurrency or add fewer workers with more concurrency". I don't know how to evaluate the tradeoff for that. – Joseph Aug 10 '15 at 17:50
  • let's distinguish between workers and worker processes. you spawn a celery worker, this then spawns a number of processes (depending on things like --concurrency and --autoscale). There is no point in running more than one worker unless you want to do routing, listen to different queues etc. I would say run one worker with the default number of processes (i.e. omit --concurrency and --autoscale and it will default to as many processes as there are cores). Then test your application with a view to estabilshing the concurrency level that suits your needs. – scytale Aug 11 '15 at 10:55
  • 1
    The memory spikes may indicate that you need to reevaluate your data structures etc. Also if your tasks run in less than a second you are probably wasting a lot of time in messaging overhead - can you not refactor your code or change your chunk size so they run for longer? – scytale Aug 11 '15 at 10:56
  • @scytale To give some more context: The tasks call an external API to send a push notification/email to a user. The payload for the task mostly contains the info that needs to be sent to the user (i.e. the external API's payload). I used to have longer running tasks with groups of 10k and the task would loop through each user. However, this also posed a risk in that if the task were to raise any error (e.g. external service down/too many requests), then the entire task would fail and that's a lot of users to miss. (continued...) – Joseph Aug 11 '15 at 16:50
  • @scytale I would then have to keep track of how far along the task was when it failed to be able to retry without double sending (a strict no-no). Probably doable, but it didn't seem like the complexity was worth it. To counter that, I split each user's payload into its own task and now I can safely attempt retries. Note that the size of the payload wouldn't change. Just more of them will be on individual tasks. – Joseph Aug 11 '15 at 16:51
  • @scytale The other alternative I've been thinking of is to write the payload (which is what's causing the memory bloat) into a database when the messages are produced, and the task, when it's time to run, would pull the corresponding payload from the database and dispatch. This way, the task <-> message queue communication and payload is small and each task can pull from the db to do its job. – Joseph Aug 11 '15 at 16:53
  • suggestion: 1: simplify - use 1 celery worker, adjust the number of processes (--concurrency) as you go along so that you don't use too much memory. you mention the tasks have a payload of 20-50kb - that is _nothing_ on modern hardware. how are you able to trigger OOM killer? Are you running a vast amount of processes or something. Try running as many procs as cores, keep putting tasks on the queue - and don't use rabbitmq for results backend. – scytale Aug 11 '15 at 16:58
  • ah - I'm starting to see - you have a few million of those - yes, writing them to a separate store, and just passing an id to the task would lessen memory usage. even still... if you use only 1 worker on a box then it should vastly reduce the duplication - may get away with passing the data directly to celery. – scytale Aug 11 '15 at 17:00
  • @scytale Thanks for the suggestions. I'm doing a test run now with just 1 worker and 4 processes (chosen by default). It does seem like the tasks are progressing slower than I would like, but I'm playing around with the concurrency settings to see if it improves things. Do you know if message duplication happens among child processes as well? – Joseph Aug 11 '15 at 17:56
  • as far as i can tell, no, it is not duplicated among processes. Also if your tasks are cpu bound then there is no point in running more than the number of cores. if, however they are i/o bound (which they may be, if they are sending mail) then it may work better to have more processes than cores. it depends entirely on your workload and code. – scytale Aug 12 '15 at 09:49
  • 7
    @scytale I've solved almost all of my issues. The two biggest wins were: 1) Moving the payload into a db and only passing the payload id to the task. Instantly stabilized rabbitmq and celery (they would occasionally buckle under the combined weight of the payload) and required very little design change and 2) Using a single worker with the appropriate number of concurrent processes to reduce duplication. Thanks for your help and patience! :) If you'd like to summarize your points above, I'd be happy to accept your answer. – Joseph Aug 14 '15 at 20:45

2 Answers2

97

Let's distinguish between workers and worker processes. You spawn a celery worker, this then spawns a number of processes (depending on things like --concurrency and --autoscale, the default is to spawn as many processes as cores on the machine). There is no point in running more than one worker on a particular machine unless you want to do routing.

I would suggest running only 1 worker per machine with the default number of processes. This will reduce memory usage by eliminating the duplication of data between workers.

If you still have memory issues then save the data to a store and pass only an id to the workers.

scytale
  • 12,346
  • 3
  • 32
  • 46
  • 2
    [The docs](http://docs.celeryproject.org/en/latest/userguide/workers.html#concurrency) say that there is an advantage to running multiple workers on one machine. Maybe this has changed changed in the years since you posted this. – Travis Jan 26 '17 at 18:16
  • 7
    First, the docs only say there *may* be an advantage to running multiple workers but suggests experimentation : "There’s even some evidence to support that having multiple worker instances running, may perform better than having a single worker." Secondly in this case the payload was very large. Since each task is distributed to each worker this means that the memory requirement = size of payload * number of queued tasks * number of workers which was causing memory issues. In this case using 1 worker would reduce memory usage. However the better solution was to not pass such a big payload. – scytale Jan 27 '17 at 10:22
  • 15
    If you specify both `--concurrency` and `--autoscale`, which one takes precedence? – speedplane Oct 19 '17 at 17:41
  • 3
    These keywords ring a bell to me: `duplication`, `1 worker` per machine. This is enough for me to follow this wise advice without going to rocket science analyses of this thing..thank you. – daparic Mar 25 '18 at 10:22
10

When using --autoscale the number of processes are set dynamically with max/min values which enable the worker to scale according to load and when using --concurrency processes are set statically with a fixed number. So using these two together makes no sense.

Celery --autoscale is responsible for growing and shrinking the pool dynamically based on load. This in turn adds more processes when there is work to do and removes processes when the workload is low. So for example --autoscale=10,3 would give you a maximum of 10 processes and a minimum of 3 processes.

As for --concurrency celery by default uses multiprocessing to perform concurrent execution of tasks. The number of worker processes/threads can be changed using the --concurrency argument and defaults to the number of available CPU's if not set. So for example --concurrency=5 would use 5 processes meaning 5 tasks that can run concurrently.

WMRamadan
  • 974
  • 1
  • 11
  • 25