I'm building a web application for processing ~60,000 (and growing) large files, perform some analysis and return a "best guess" that needs to be verified by a user. The files will be refined by category to avoid loading every file, but I'm still left with a scenario where I might have to process 1000+ files at a time.
These are large files that can take up to 8-9 seconds each to process, and in a 1000+ file situation it is impractical to have a user wait 8 seconds between reviews or 2 hours+ while the files are processed before hand.
To overcome this, I've decided to use multiprocessing to spawn several workers, each of which will pick from a queue of files, process them and insert into an output queue. I have another method that basically polls the output queue for items and then streams them to the client when one becomes available.
This works well, until a portion of the way through when the queue arbitrarily stops returning items. We're using gevent with Django and uwsgi in our environment and I'm aware that child process creation via multiprocessing in the context of gevent yields an undesired event loop state in the child. Greenlets spawned before forking are duplicated in the child. Therefore, I've decided to use gipc to assist in the handling of the child processes.
An example of my code (I cannot post my actual code):
import multiprocessing
import gipc
from item import Item
MAX_WORKERS = 10
class ProcessFiles(object):
def __init__(self):
self.input_queue = multiprocessing.Queue()
self.output_queue = multiprocessing.Queue()
self.file_count = 0
def query_for_results(self):
# Query db for records of files to process.
# Return results and set self.file_count equal to
# the number of records returned.
pass
# The subprocess.
def worker(self):
# Chisel away at the input queue until no items remain.
while True:
if self.no_items_remain():
return
item = self.input_queue.get(item)
item.process()
self.output_queue.put(item)
def start(self):
# Get results and store in Queue for processing
results = self.query_for_results()
for result in results:
item = Item(result)
self.input_queue.put(item)
# Spawn workers to process files.
for _ in xrange(MAX_WORKERS):
process = gipc.start_process(self.worker)
# Poll for items to send to client.
return self.get_processed_items()
def get_processed_items(self):
# Wait for the output queue to hold at least 1 item.
# When an item becomes available, yield it to client.
count = 0
while count != self.file_count:
#item = self._get_processed_item()
# Debugging:
try:
item = self.output_queue.get(timeout=1)
except:
print '\tError fetching processed item. Retrying...'
continue
if item:
print 'QUEUE COUNT: {}'.format(self.output_queue.qsize())
count += 1
yield item
yield 'end'
I expect the output to show the current count of the queue after processing and yielding an item:
QUEUE COUNT: 999
QUEUE COUNT: 998
QUEUE COUNT: 997
QUEUE COUNT: 996
...
QUEUE COUNT: 4
QUEUE COUNT: 3
QUEUE COUNT: 2
QUEUE COUNT: 1
However, the script only manages to yield a few items before failing:
QUEUE COUNT: 999
QUEUE COUNT: 998
QUEUE COUNT: 997
QUEUE COUNT: 996
Error fetching processed item. Retrying...
Error fetching processed item. Retrying...
Error fetching processed item. Retrying...
Error fetching processed item. Retrying...
Error fetching processed item. Retrying...
Error fetching processed item. Retrying...
...
My question is: What exactly is happening? Why can't I get
from the queue? How can I return the item I expect and avoid this?