18

I am using Celery with RabbitMQ to process data from API requests. The process goes as follows:

Request > API > RabbitMQ > Celery Worker > Return

Ideally I would spawn more celery workers but I am restricted by memory constraints.

Currently, the bottleneck in my process is fetching and downloading the data from the URLs passed into the worker. Roughy, the process looks like this:

def celery_gets_job(url):
    data = fetches_url(url)       # takes 0.1s to 1.0s (bottleneck)
    result = processes_data(data) # takes 0.1s
    return result

This is unacceptable as the worker is locked up for a while while fetching the URL. I am looking at improving this through threading, but I am unsure what the best practices are.

  • Is there a way to make the celery worker download the incoming data asynchronously while processing the data at the same time in a different thread?

  • Should I have separate workers fetching and processing, with some form of message passing, possibly via RabbitMQ?

damon
  • 14,485
  • 14
  • 56
  • 75
Dominic Cabral
  • 962
  • 9
  • 21
  • 1
    You can consider using something like [multiprocessing pipes](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Pipe) within celery task by creating two multiprocesses. Ofcourse your multiprocessing processes should be restriced by pool. Sharing fetched url's large data over rabbitmq/result backend would not be good idea if I am not wrong. Celery low level api's can also have some similar kind of functionality. – Sanket Sudake Nov 21 '16 at 19:52
  • 1
    I am not aware of RabbitMQ but what I think is multiprocessing will be more suitable for you than multithreading as `celery_gets_job` has multiple non-atomic operations and this will create problems while using multithreading. You can use Queue where data is populated by pool of processes running `fetches_url(url)` and another process(es) to carry out `processes_data(data)` – shrishinde Nov 25 '16 at 17:09
  • This may be what you are looking for: http://stackoverflow.com/questions/28315657/celery-eventlet-non-blocking-requests – fpbhb Nov 27 '16 at 13:19
  • 1
    This post https://news.ycombinator.com/item?id=11889549 by the creator of Celery may be what you are looking for. – dyeray Nov 28 '16 at 15:31

2 Answers2

3

Using the eventlet library, you can patch the standard libraries for making them asynchronous.

First import the async urllib2:

from eventlet.green import urllib2

So you will get the url body with:

def fetch(url):
    body = urllib2.urlopen(url).read()
    return body

See more eventlet examples here.

cwallenpoole
  • 79,954
  • 26
  • 128
  • 166
otorrillas
  • 4,357
  • 1
  • 21
  • 34
  • 3
    Also, directly using the eventlet execution pools http://docs.celeryproject.org/en/latest/userguide/concurrency/eventlet.html should automatically monkey patch io calls. – dyeray Nov 28 '16 at 15:28
  • But then wouldn't `processes_data(data)` still block and make the combined result slower than before? – ostrokach Jul 18 '17 at 18:04
0

I would create two tasks, one for downloading the data and the other for processing it once it is downloaded. This way you could scale the two tasks independently. See: Routing, Chains.

ostrokach
  • 17,993
  • 11
  • 78
  • 90
  • Doesn't look like a solution. Workers will still get stuck waiting for io to complete. The goal is to have 1 worker download multiple urls at once. – ogurets Jul 19 '19 at 16:58