0

I'm trying to speed-up a nested loop with inner cpu-bound tasks:

def main():
    for i in range(10):
       cpu_bound_task()
       for j in range(10):
          cpu_bound_task()
          for k in range(100):
             cpu_bound_task()

For simplicity, the cpu-bound task is defined as:

def cpu_bound_task():
    _ = [0]*1000000

Based on a simple derivation from other posts (this or this), using pool.map() on the last loop would do the trick (provided the loop is large enough; otherwise the overhead of starting a pool seems to defeat the purpose of creating a pool):

from multiprocessing import Pool
def main_pool():
    for i in range(10):
        cpu_bound_task()
        for j in range(10):
          cpu_bound_task()
          pool = Pool()
          pool.map(func=cpu_bound_task, iterable=()*100)
          pool.close()
          pool.join()

However, I have a have 2 additional requirements:

  1. The first loop must also be run in 3 parallel processes (I am aware this will have no speed improvement)
  2. Each iteration of a loop must wait for all the processes in that iteration to complete, before advancing to the next iteration

My approach (which might not be the best!) involved creating a WaitGroup class whose instances would establish a connection with a shared queue (adding tasks to that queue, signalling task completion, and waiting for the group's processes to complete). Then, multiple processes would run a run_func() function that would grab tasks from that queue and execute them.

The run_func() function is defined as:

def run_func(q):
    while True:
        task = q.get()
        func, kwargs = task
        if func is None: break # signals end of all tasks
        func(**kwargs, q=q)

And the WaitGroup class is defined as:

class WaitGroup():
    
    def __init__(self, group_id, max_p, wg_shared_inputs):
        self.group_id = group_id
        self.max_p = max_p # maximum elements sent to the queue
        self.wait_count = wg_shared_inputs['values'][self.group_id]
        self.cv = wg_shared_inputs['conditions'][self.group_id]
    
    def add(self, func, kwargs, q):
        '''add task to the queue'''
        self.cv.acquire()
        if self.max_p:
            while self.wait_count.value >= self.max_p: # >= or >, check
                self.cv.wait() # releases lock automatically
        q.put((func,{**kwargs,'parent_wg':self}))
        self.wait_count.value += 1
        self.cv.release()
    
    def done(self):
        '''mark task as completed'''
        self.cv.acquire()
        if self.wait_count.value > 0:
            self.wait_count.value -= 1
        if self.wait_count.value == 0:
            self.cv.notify_all()
        self.cv.release()
    
    def wait(self):
        '''wait for a group of tasks to be completed'''
        self.cv.acquire()
        while self.wait_count.value > 0:
            self.cv.wait()
        self.cv.release()

The wg_shared_inputs is a simple dict created upfront containing empty instances of manager.Value() and manager.Condition(). (Ideally these instances would be created by the WaitGroup class when needed but unfortunately I can't seem to be able to do that since the WorkGroup instances are passed as arguments to Processes. So I must determine upfront how many instances will be needed)

The last step was to split the loop into multiple steps, with the main function defined as:

def main_q():
    
    # Handle Manager Variables
    from multiprocessing import Manager
    manager = Manager()
    q = manager.Queue()
    values = manager.dict({0:manager.Value('i',0), 1:manager.Value('i',0), 2:manager.Value('i',0)})
    conditions = manager.dict({0:manager.Condition(), 1:manager.Condition(), 2:manager.Condition()})
    wg_shared_inputs = {'values':values, 'conditions':conditions}
    
    # Launch Processes
    from multiprocessing import Process
    num = 10
    processes = [Process(target=run_func, args=(q,)) for _ in range(num)]
    for p in processes: p.start()
        
    # Create & Launch Wait Group
    wg = WaitGroup(group_id=0, max_p=3, wg_shared_inputs=wg_shared_inputs)
    for i in range(20): wg.add(step1, {'i':i, 'wg_shared_inputs':wg_shared_inputs}, q)
    wg.wait()

    # End Queue
    for _ in range(num): q.put((None,{})) # signals end of all tasks
        
    # Join Processes
    for p in processes: p.join()

And the subsequent steps defined as:

def step1(i, wg_shared_inputs, q, parent_wg=None):
    cpu_bound_task()
    wg = WaitGroup(group_id=1, max_p=1, wg_shared_inputs=wg_shared_inputs)
    for j in range(10): wg.add(step2, {'i':i, 'wg_shared_inputs':wg_shared_inputs}, q)
    wg.wait()
    parent_wg.done()
    
def step2(i, wg_shared_inputs, q, parent_wg=None):
    cpu_bound_task()
    wg = WaitGroup(group_id=2, max_p=None, wg_shared_inputs=wg_shared_inputs)
    for k in range(100): wg.add(step3, {'i':i, 'wg_shared_inputs':wg_shared_inputs}, q)
    wg.wait()
    parent_wg.done()

def step3(i, wg_shared_inputs, q, parent_wg=None):
    cpu_bound_task()
    parent_wg.done()

Running the 3 different options I get:

SIMPLE VERSION (main())
Completed in 84.85 seconds

POOL VERSION (main_pool())
Completed in 62.62 seconds

QUEUE VERSION (main_q())
Completed in 131.84 seconds

I am surprised at the results. Any ideas why the queue version is so much slower? Or any ideas how I can achieve my goals with a different methodology?

Mike
  • 102
  • 6
  • 1
    This seems like a classic use case for a generator. Make your loop generating the tasks `yield` them, and then you can pass that stream into pmap. (That said, personally, I wouldn't choose Python at all here; lots of languages available -- Julia, Clojure, Scala, etc -- that are comparably high-level but without the GIL or otherwise with a faster runtime than CPython; Clojure's default approach of having two default predefined pools for background tasks -- one for I/O-bound tasks, another for CPU-bound tasks -- seems particularly appropriate here). – Charles Duffy Jul 18 '21 at 23:03
  • 1
    ...either way, using a language without a GIL lets you use multithreading instead of multiprocessing, so all the serialization/deserialization overhead goes away. – Charles Duffy Jul 18 '21 at 23:03

1 Answers1

0

First, your code for main_pool is not quite correct: iterable=()*100 is equivalent to iterable=(). A modified version still using map could be:

def cpu_bound_task():
    _ = [0]*1000000

def cpu_bound_task_adapter(index):
    return cpu_bound_task()

def main_pool():
    for i in range(10):
        cpu_bound_task()
        for j in range(10):
            pool = Pool()
            pool.map(func=cpu_bound_task_adapter, iterable=range(100))
            pool.close()
            pool.join()

On my desktop your SIMPLE VERSION (main()) completes in 67.9 seconds and your corrected POOL VERSION (main_pool()) completes in 46.2 seconds (I have 8 logical, 4 physical cores).

But there is no reason to be creating and tearing down the pool repeatedly in the loop. Instead, create the pool once at the outset:

def main_pool():
    pool = Pool()
    for i in range(10):
        cpu_bound_task()
        for j in range(10):
            pool.map(func=cpu_bound_task_adapter, iterable=range(100))
    pool.close()

This now ran in 20.9 seconds.

If I modify your code to explicitly specify a pool size of 3, the modified version completes in 28.4 seconds.

Booboo
  • 38,656
  • 3
  • 37
  • 60