I have a pretty plain vanilla implementation of concurrent.futures.ProcessPoolExecutor
-- something like (using Python 3.6):
files = get_files()
processor = get_processor_instance()
with concurrent.futures.ProcessPoolExecutor() as executor:
list(executor.map(processor.process, files))
While the processor
is an instance of any of a number of available processor classes, they all share the process
method, which looks roughly like this:
def process(self, file):
log.debug(f"Processing source file {file.name}.")
with DBConnection(self.db_url) as session:
file = session.merge(file)
session.refresh(file)
self._set_file(file)
timer = perf_counter()
try:
self.records = self._get_records()
self._save_output()
except Exception as ex:
log.warning(f"Failed to process source file {file.ORIGINAL_NAME}: {ex}")
self.error_time = time.time()
self.records = None
else:
process_duration = perf_counter() - timer
log.info(f'File {file.name} processed in {process_duration:.6f} seconds.')
file.process_duration = process_duration
session.commit()
Implementation of the _get_records
and _save_output
methods vary per class, but my problem is with handling of errors. I'm deliberately testing it so that one of those two methods runs out of memory, but I would expect the except
block above to catch it and move the the next file -- and this is precisely what happens when I run the code in a single process.
If I use ProcessPoolExecutor
as described above, it raises the BrokenProcessPool
exception and kills all execution:
Traceback (most recent call last):
File "/vagrant/myapp/myapp.py", line 94, in _process
list(executor.map(processor.process, files))
File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/process.py", line 366, in _chain_from_iterable_of_lists
for element in iterable:
File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/_base.py", line 586, in result_iterator
yield fs.pop().result()
File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/_base.py", line 432, in result
return self.__get_result()
File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
I can of course catch the BrokenProcessPool
in the calling code, but I'd prefer to handle the error internally and proceed to the next file.
I also tried using the standard multiprocessing.Pool
object, like this:
with multiprocessing.Pool() as pool:
pool.map(processor.process, files)
In this case, the behaviour is even weirder: after starting to process the first two files, which raise the out of memory error, it moves on to processing the later files, which are smaller so get processed completely. However, the except
block apparently never gets triggered (no log messages, no error_time
), and the application just hangs, neither finishing nor doing anything, until killed manually.
I was hoping that the try..except
block would make each process self-contained, handling its own errors without affecting the main application. Any ideas how to achieve that?