I'm running a Python 2.7 script that processes many thousands of files and documents, using 16 processors to process jobs queued in a JoinableQueue. We've run into an issue where some of the file/folder data being processed is corrupt. All jobs seem to eventually complete, but the ones with the corrupt data take a really long time. The long-running processing actually happens in an external library, so once it starts, the process needs to wait for the library method to finish.
I don't want to kill the long-running processes, but when a job is taking longer than 30 seconds or one minute, I'd like to log a message about which job is working longer than expected.
The main code is blocked with queue.join(). How can I monitor the state of the processing? Is the best approach to start an async background timer process for each time a document job is processed, or is there a better way?
I've removed most of the code, but a skeleton of what we're doing is the following:
queue = mp.JoinableQueue()
for doc in doclist:
queue.put(doc)
processes = [mp.Process(target=doprocessing, args=[queue]) for i in range(nb_workers)]
for p in processes:
p.start()
queue.join()
for p in processes:
p.terminate()
def doprocessing(queue):
while True:
item = queue.get()
try:
processDoc(item["id"])
except:
pass
queue.task_done()
def processDoc(id):
# Do processing
pass