Using method multiprocessing.pool.imap
is a step in the right direction but the problem is that with so much input you will be feeding the input task queue faster than the processing pool can take the tasks off the queue and return results. Consequently, the task queue will continue to grow and you will exhaust memory. What is needed is a way to "throttle" method imap
so that it blocks once the task queue size has N
tasks on it. I think a reasonable value for N
as a default is twice the pool size to ensure that when a pool process completes work on a task there will be no delay for it to find another task to work on. Hence we create classes BoundedQueueProcessPool
(multiprocessing) and BoundedQueueThreadPool
(multithreading):
import multiprocessing.pool
import multiprocessing
import threading
class ImapResult():
def __init__(self, semaphore, result):
self._semaphore = semaphore
self.it = result.__iter__()
def __iter__(self):
return self
def __next__(self):
try:
elem = self.it.__next__()
self._semaphore.release()
return elem
except StopIteration:
raise
except:
self._semaphore.release()
raise
class BoundedQueuePool:
def __init__(self, limit, semaphore):
self._limit = limit
self._semaphore = semaphore
def release(self, result, callback=None):
self._semaphore.release()
if callback:
callback(result)
def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):
self._semaphore.acquire()
callback_fn = self.release if callback is None else lambda result: self.release(result, callback=callback)
error_callback_fn = self.release if error_callback is None else lambda result: self.release(result, callback=callback)
return super().apply_async(func, args, kwds, callback=callback_fn, error_callback=error_callback_fn)
def imap(self, func, iterable, chunksize=1):
def new_iterable(iterable):
for elem in iterable:
self._semaphore.acquire()
yield elem
if chunksize > self._limit:
raise ValueError(f'chunksize argument exceeds {self._limit}')
result = super().imap(func, new_iterable(iterable), chunksize)
return ImapResult(self._semaphore, result)
def imap_unordered(self, func, iterable, chunksize=1):
def new_iterable(iterable):
for elem in iterable:
self._semaphore.acquire()
yield elem
if chunksize > self._limit:
raise ValueError(f'chunksize argument exceeds {self._limit}')
result = super().imap_unordered(func, new_iterable(iterable), chunksize)
return ImapResult(self._semaphore, result)
class BoundedQueueProcessPool(BoundedQueuePool, multiprocessing.pool.Pool):
def __init__(self, *args, max_waiting_tasks=None, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
if max_waiting_tasks is None:
max_waiting_tasks = self._processes
elif max_waiting_tasks < 0:
raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
limit = self._processes + max_waiting_tasks
BoundedQueuePool.__init__(self, limit, multiprocessing.BoundedSemaphore(limit))
class BoundedQueueThreadPool(BoundedQueuePool, multiprocessing.pool.ThreadPool):
def __init__(self, *args, max_waiting_tasks=None, **kwargs):
multiprocessing.pool.ThreadPool.__init__(self, *args, **kwargs)
if max_waiting_tasks is None:
max_waiting_tasks = self._processes
elif max_waiting_tasks < 0:
raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
limit = self._processes + max_waiting_tasks
BoundedQueuePool.__init__(self, limit, threading.BoundedSemaphore(limit))
#######################################################################
from time import sleep
def process_line(line):
sleep(3)
# the lines already have line end characters:
print(line, end='')
return True
if __name__ == "__main__":
pool = BoundedQueueProcessPool(2)
with open("test.txt") as file:
for res in pool.imap(process_line, file):
#print(res)
pass
pool.close()
pool.join()