With multiprocessing you have a slight problem with how to deal with exceptions that occur within a single job. If you use the map
variants then you need to be careful in how you poll for results otherwise you might lose some if the map
function is forced to raise an exception. Further, you won't even know which job was the problem unless you have special handling any any exceptions within your job. If you use the apply
variants then you don't need to be has careful when getting your results, but collating the results becomes a bit more tricky.
Overall, I think map
is the easiest to get working though.
First, you need a special exception, which cannot be created in your main module, otherwise Python will have trouble serialising and deserialising it correctly.
eg.
custom_exceptions.py
class FailedJob(Exception):
pass
main.py
from multiprocessing import Pool
import time
import random
from custom_exceptions import FailedJob
def convert_file(filename):
# pseudo implementation to demonstrate what might happen
if filename == 'file2.txt':
time.sleep(0.5)
raise Exception
elif filename =='file0.txt':
time.sleep(0.3)
else:
time.sleep(random.random())
return filename # return filename, so we can identify the job that was completed
def job(filename):
"""Wraps any exception that occurs with FailedJob so we can identify which job failed
and why"""
try:
return convert_file(filename)
except Exception as ex:
raise FailedJob(filename) from ex
def main():
chunksize = 4 # number of jobs before dispatch
total_jobs = 20
files = list('file{}.txt'.format(i) for i in range(total_jobs))
with Pool() as pool:
# we use imap_unordered as we don't care about order, we want the result of the
# jobs as soon as they are done
iter_ = pool.imap_unordered(job, files)
while True:
completed = []
while len(completed) < chunksize:
# collect results from iterator until we reach the dispatch threshold
# or until all jobs have been completed
try:
result = next(iter_)
except StopIteration:
print('all child jobs completed')
# only break out of inner loop, might still be some completed
# jobs to dispatch
break
except FailedJob as ex:
print('processing of {} job failed'.format(ex.args[0]))
else:
completed.append(result)
if completed:
print('completed:', completed)
# put your dispatch logic here
if len(completed) < chunksize:
print('all jobs completed and all job completion notifications'
' dispatched to central server')
return
if __name__ == '__main__':
main()