I am running a few intensive parallel computations using multiprocessing. I have abstracted that in runcode() as a sleep function. Similar to the function, it is hard to say how long any of the runs can take. After starting the first batch of runs (master), i wait for them to finish. Often i realize that a subset of it finishes running and I am waiting on a few slower ones. Instead of waiting and doing nothing, I wanted to comission a second set of runs to 'kill time'. But once the master runs finish, i will to kill the second set of runs regardless of completion(i.e. pool.terminate()) and get on with the rest of my code. I cant blindly terminate the entire pool as i have a listener function that handles reading and writing that needs to be active at all times. There were 2 ideas i thought of.
1 - Add the second set of runs on the same pool and selectively kill them. But I wasnt sure how to do so selectively, perhaps through some tag.
2 - Make a 2nd pool and run the second set there and i can blindly terminate that when all the master runs comeplete. This is what i have partially shown below (these are the lines with '###'). I remember reading somewhere a while back that having multiple pools is not a good idea but was unable to find it again.
Both ideas have another common issue. I could blindly comission MANY MANY runs for the second set and hope that the master runs finish before the secondary ones complete. Ideally, i will be able to comission secondary runs only when the first master runs have NOT completed AND there are cores available (nicknamed 'ThereAreFreeCores()' below). Is there a way to do this?
import multiprocessing
import multiprocessing.pool
from contextlib import ExitStack
import time
import random
class BoundedQueuePool:
def __init__(self, limit, semaphore_type):
self._semaphore = semaphore_type(limit)
def release(self, result, callback=None):
self._semaphore.release()
if callback:
callback(result)
def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):
self._semaphore.acquire()
callback_fn = self.release if callback is None else lambda result: self.release(result, callback=callback)
error_callback_fn = self.release if error_callback is None else lambda result: self.release(result, callback=callback)
return super().apply_async(func, args, kwds, callback=callback_fn, error_callback=error_callback_fn)
class BoundedQueueProcessPool(BoundedQueuePool, multiprocessing.pool.Pool):
def __init__(self, *args, max_waiting_tasks=None, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
if max_waiting_tasks is None:
max_waiting_tasks = self._processes
elif max_waiting_tasks < 0:
raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
limit = self._processes + max_waiting_tasks
BoundedQueuePool.__init__(self, limit, multiprocessing.BoundedSemaphore)
def listener (q, csv_names):
'''
Listens for results from each run and prints it to file.
'''
with ExitStack() as stack:
files = {key: stack.enter_context(open(csv_names[key], "w")) for key in csv_names}
for key in csv_names:
files[key].write('Value\n')
files[key].flush()
while True:
m = q.get()
if m == 'STOP':
break
ff = m[0]
files[ff].write(m[1])
files[ff].flush()
return
def runcode(q):
x = random.random()
time.sleep(x*10)
q.put([1, str(x)+'\n'])
return
iterlimit = 40
csv_names = {1:"./file1.csv"}
count = 0
manager = multiprocessing.Manager()
q = manager.Queue()
pool = BoundedQueueProcessPool(multiprocessing.cpu_count() + 2)
pool2 = BoundedQueueProcessPool(multiprocessing.cpu_count() + 2) ###
pool.apply_async(listener, (q,csv_names)) # Activating Listener - Handles read and write
while count <= iterlimit: # Lets call these runs as master runs
count += 1
pool.apply_async(runcode, (q,))
while len(pool._cache) > 1: # Waiting for runs to finish
# New chunk I want to add
# if ThereAreFreeCores(): ###
# pool2.apply_async(runcode, (q,)) ### Lets call these runs secondary runs
continue
# Now ALL master runs are finished but secondary runs MIGHT not be done
pool.close()
pool2.terminate() ###
q.put('STOP')
pool.join()