I'm trying to speed-up a nested loop with inner cpu-bound tasks:
def main():
for i in range(10):
cpu_bound_task()
for j in range(10):
cpu_bound_task()
for k in range(100):
cpu_bound_task()
For simplicity, the cpu-bound task is defined as:
def cpu_bound_task():
_ = [0]*1000000
Based on a simple derivation from other posts (this or this), using pool.map() on the last loop would do the trick (provided the loop is large enough; otherwise the overhead of starting a pool seems to defeat the purpose of creating a pool):
from multiprocessing import Pool
def main_pool():
for i in range(10):
cpu_bound_task()
for j in range(10):
cpu_bound_task()
pool = Pool()
pool.map(func=cpu_bound_task, iterable=()*100)
pool.close()
pool.join()
However, I have a have 2 additional requirements:
- The first loop must also be run in 3 parallel processes (I am aware this will have no speed improvement)
- Each iteration of a loop must wait for all the processes in that iteration to complete, before advancing to the next iteration
My approach (which might not be the best!) involved creating a WaitGroup class whose instances would establish a connection with a shared queue (adding tasks to that queue, signalling task completion, and waiting for the group's processes to complete). Then, multiple processes would run a run_func() function that would grab tasks from that queue and execute them.
The run_func() function is defined as:
def run_func(q):
while True:
task = q.get()
func, kwargs = task
if func is None: break # signals end of all tasks
func(**kwargs, q=q)
And the WaitGroup class is defined as:
class WaitGroup():
def __init__(self, group_id, max_p, wg_shared_inputs):
self.group_id = group_id
self.max_p = max_p # maximum elements sent to the queue
self.wait_count = wg_shared_inputs['values'][self.group_id]
self.cv = wg_shared_inputs['conditions'][self.group_id]
def add(self, func, kwargs, q):
'''add task to the queue'''
self.cv.acquire()
if self.max_p:
while self.wait_count.value >= self.max_p: # >= or >, check
self.cv.wait() # releases lock automatically
q.put((func,{**kwargs,'parent_wg':self}))
self.wait_count.value += 1
self.cv.release()
def done(self):
'''mark task as completed'''
self.cv.acquire()
if self.wait_count.value > 0:
self.wait_count.value -= 1
if self.wait_count.value == 0:
self.cv.notify_all()
self.cv.release()
def wait(self):
'''wait for a group of tasks to be completed'''
self.cv.acquire()
while self.wait_count.value > 0:
self.cv.wait()
self.cv.release()
The wg_shared_inputs is a simple dict created upfront containing empty instances of manager.Value() and manager.Condition(). (Ideally these instances would be created by the WaitGroup class when needed but unfortunately I can't seem to be able to do that since the WorkGroup instances are passed as arguments to Processes. So I must determine upfront how many instances will be needed)
The last step was to split the loop into multiple steps, with the main function defined as:
def main_q():
# Handle Manager Variables
from multiprocessing import Manager
manager = Manager()
q = manager.Queue()
values = manager.dict({0:manager.Value('i',0), 1:manager.Value('i',0), 2:manager.Value('i',0)})
conditions = manager.dict({0:manager.Condition(), 1:manager.Condition(), 2:manager.Condition()})
wg_shared_inputs = {'values':values, 'conditions':conditions}
# Launch Processes
from multiprocessing import Process
num = 10
processes = [Process(target=run_func, args=(q,)) for _ in range(num)]
for p in processes: p.start()
# Create & Launch Wait Group
wg = WaitGroup(group_id=0, max_p=3, wg_shared_inputs=wg_shared_inputs)
for i in range(20): wg.add(step1, {'i':i, 'wg_shared_inputs':wg_shared_inputs}, q)
wg.wait()
# End Queue
for _ in range(num): q.put((None,{})) # signals end of all tasks
# Join Processes
for p in processes: p.join()
And the subsequent steps defined as:
def step1(i, wg_shared_inputs, q, parent_wg=None):
cpu_bound_task()
wg = WaitGroup(group_id=1, max_p=1, wg_shared_inputs=wg_shared_inputs)
for j in range(10): wg.add(step2, {'i':i, 'wg_shared_inputs':wg_shared_inputs}, q)
wg.wait()
parent_wg.done()
def step2(i, wg_shared_inputs, q, parent_wg=None):
cpu_bound_task()
wg = WaitGroup(group_id=2, max_p=None, wg_shared_inputs=wg_shared_inputs)
for k in range(100): wg.add(step3, {'i':i, 'wg_shared_inputs':wg_shared_inputs}, q)
wg.wait()
parent_wg.done()
def step3(i, wg_shared_inputs, q, parent_wg=None):
cpu_bound_task()
parent_wg.done()
Running the 3 different options I get:
SIMPLE VERSION (main())
Completed in 84.85 seconds
POOL VERSION (main_pool())
Completed in 62.62 seconds
QUEUE VERSION (main_q())
Completed in 131.84 seconds
I am surprised at the results. Any ideas why the queue version is so much slower? Or any ideas how I can achieve my goals with a different methodology?