0

I am running a few intensive parallel computations using multiprocessing. I have abstracted that in runcode() as a sleep function. Similar to the function, it is hard to say how long any of the runs can take. After starting the first batch of runs (master), i wait for them to finish. Often i realize that a subset of it finishes running and I am waiting on a few slower ones. Instead of waiting and doing nothing, I wanted to comission a second set of runs to 'kill time'. But once the master runs finish, i will to kill the second set of runs regardless of completion(i.e. pool.terminate()) and get on with the rest of my code. I cant blindly terminate the entire pool as i have a listener function that handles reading and writing that needs to be active at all times. There were 2 ideas i thought of.

1 - Add the second set of runs on the same pool and selectively kill them. But I wasnt sure how to do so selectively, perhaps through some tag.

2 - Make a 2nd pool and run the second set there and i can blindly terminate that when all the master runs comeplete. This is what i have partially shown below (these are the lines with '###'). I remember reading somewhere a while back that having multiple pools is not a good idea but was unable to find it again.

Both ideas have another common issue. I could blindly comission MANY MANY runs for the second set and hope that the master runs finish before the secondary ones complete. Ideally, i will be able to comission secondary runs only when the first master runs have NOT completed AND there are cores available (nicknamed 'ThereAreFreeCores()' below). Is there a way to do this?

import multiprocessing
import multiprocessing.pool
from contextlib import ExitStack
import time
import random

class BoundedQueuePool:
    def __init__(self, limit, semaphore_type):
        self._semaphore = semaphore_type(limit)

    def release(self, result, callback=None):
        self._semaphore.release()
        if callback:
            callback(result)

    def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):
        self._semaphore.acquire()
        callback_fn = self.release if callback is None else lambda result: self.release(result, callback=callback)
        error_callback_fn = self.release if error_callback is None else lambda result: self.release(result, callback=callback)
        return super().apply_async(func, args, kwds, callback=callback_fn, error_callback=error_callback_fn)


class BoundedQueueProcessPool(BoundedQueuePool, multiprocessing.pool.Pool):
    def __init__(self, *args, max_waiting_tasks=None, **kwargs):
        multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
        if max_waiting_tasks is None:
            max_waiting_tasks = self._processes
        elif max_waiting_tasks < 0:
            raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
        limit = self._processes + max_waiting_tasks
        BoundedQueuePool.__init__(self, limit, multiprocessing.BoundedSemaphore)


def listener (q, csv_names):
    '''
    Listens for results from each run and prints it to file.
    '''
    with ExitStack() as stack:
        files = {key: stack.enter_context(open(csv_names[key], "w")) for key in csv_names}
        for key in csv_names:
            files[key].write('Value\n')
            files[key].flush()


        while True:
            m = q.get()
            if m == 'STOP':
                break

            ff = m[0]
            files[ff].write(m[1])
            files[ff].flush()

    return


def runcode(q):
    x = random.random()
    time.sleep(x*10)
    q.put([1, str(x)+'\n'])
    
    return



iterlimit = 40
csv_names = {1:"./file1.csv"}
count = 0

manager = multiprocessing.Manager()
q = manager.Queue()    
pool = BoundedQueueProcessPool(multiprocessing.cpu_count() + 2)
pool2 = BoundedQueueProcessPool(multiprocessing.cpu_count() + 2) ###

pool.apply_async(listener, (q,csv_names)) # Activating Listener - Handles read and write

while count <= iterlimit: # Lets call these runs as master runs
    count += 1
    pool.apply_async(runcode, (q,)) 

while len(pool._cache) > 1: # Waiting for runs to finish
    # New chunk I want to add
    # if ThereAreFreeCores(): ###
    #     pool2.apply_async(runcode, (q,)) ### Lets call these runs secondary runs
    continue

# Now ALL master runs are finished but secondary runs MIGHT not be done
pool.close()
pool2.terminate() ###
q.put('STOP')
pool.join()
Abilash
  • 95
  • 10

1 Answers1

1

I have a potential solution that uses a single multiprocessing pool, but it is the standard one and not the BoundedQueueProcessPool class you are using. First, I saw no reason to be using this since the number of master tasks you are submitting does not seem to be anything that is going to stress memory, which is the main purpose for such a pool. You can always reinstate the BoundedQueueProcessPool if you feel you really need it. But after the first iterlimit master tasks have been submitted, I only submit additional tasks when there is a free process. So the queue remains bounded as a result.

Please read the comments in the code as I have put much of the explanation there. Basically, I know how many master tasks are being submitted and as each one completes I know how many master tasks left to complete are. I also know the pool size and so it is easy to deduce whether there is a free pool process or not available for submitting additional tasks.

Because I am now using a single pool, which I want to terminate as soon as the last master task has completed, I need for the listener not to be running in the pool. But note that my pool-termination condition is when the all the original master tasks have completed. Non-master tasks submitted could terminate before a submitted task completes. So your CSV file can have more than iterlimit rows (not counting the first, header row). But I believe that is what you want.

import multiprocessing
from contextlib import ExitStack
import time
import random

def listener (q, csv_names):
    '''
    Listens for results from each run and prints it to file.
    '''
    with ExitStack() as stack:
        # Use dict.items method
        files = {key: stack.enter_context(open(path, "w")) for key, path in csv_names.items()}
        for key in csv_names:
            files[key].write('Value\n')
            files[key].flush()

        while True:
            m = q.get()
            if m == 'STOP':
                break

            ff = m[0]
            files[ff].write(m[1])
            files[ff].flush()

def init_pool_processes(the_queue):
    global q

    q = the_queue

def runcode():
    x = random.random()
    time.sleep(x*10)
    q.put([1, str(x)+'\n'])

iterlimit = 40
csv_names = {1: "./file1.csv"}

# Use a multiprocessing queue
q = multiprocessing.Queue()
# Use a regular multiprocessing.pool.Pool instance since:
# (1) number of tasks submitted is not high and
# (2) We don't want our callback function to block.
# Non-master tasks will only be submitted as submitted tasks complete,
# so that will keep the queue size bounded to iterlimit.
# The maximum number of non-master tasks that can be submitted is
# determined by the pool size.
# For example, if the pool size were N, then the first non-master task will
# be submitted when the number of uncompleted master tasks is N-1,
# implying that there is a free pool process ready to execute that
# task. Then only when a task completes (master of non-master) will
# the next non-master task be submitted. When the last master task
# completes, any non-master tasks still running will be killed.
pool_size = multiprocessing.cpu_count() # Why + 2?
pool = multiprocessing.Pool(pool_size, initializer=init_pool_processes, initargs=(q,))

# Use a separate process, which will become clearer why:
listener_process = multiprocessing.Process(target=listener, args=(q, csv_names))
listener_process.start()

# By using a callback there is no looping required:
# Callback for master tasks:
def master_callback(result):
    global master_tasks_left_to_run

    master_tasks_left_to_run -= 1
    if master_tasks_left_to_run == 0:
        # All the master tasks have completed, so kill the pool
        # (this is why the listener is a separate process):
        pool.terminate()
        # And tell the listener to end up when it has processed all the results
        # that are currently in the queue:
        q.put('STOP')
    elif master_tasks_left_to_run < pool_size:
        # Now we can submit non master tasks:
        non_master_callback(result)

# Callback for non-master-tasks:
def non_master_callback(result):
    # Just submit another non-master task:
    pool.apply_async(runcode, callback=non_master_callback, error_callback=non_master_callback)

master_tasks_left_to_run = iterlimit
for _ in range(iterlimit):
    pool.apply_async(runcode, callback=master_callback, error_callback=master_callback)

# Now wait for all file writing to complete:
listener_process.join()
Booboo
  • 38,656
  • 3
  • 37
  • 60
  • Just minor comments/replies to your comments. - I had the +2 in the pool size since i read somewhere that addding that will force all cores to be used. - Do we need to consider at N-2 processes left instead of N-1 since the listener will be running constandly on one of the cores? - I will be making 2 modifications. Currently, dont think they should cause any issues but will update if some issues pop up – Abilash Jun 22 '23 at 14:30
  • Also, what is happening inside "initializer=init_pool_processes"? It looks like the queue is being replaced by the the_queue input arguemen – Abilash Jun 22 '23 at 14:38
  • In your original code `q` was being passed an argument to `runcode`. But after changing `q` from a *managed queue* instance to a `multiprocessing.Queue` instance it can no longer be passed as an argument to the `pool.apply_async` method call. Instead, `init_pool_processes` creates a *global variable `q`* in each pool process initialized to the queue that the main process creates. Now when `runcode` references `q` it is referencing the global variable instead of what was an argument to the function. (more...) – Booboo Jun 22 '23 at 14:46
  • The created pool calls `init_pool_processes(q)` once for each pool process as part of that process's initialization. In `init_pool_processes` the name of the argument being passed to it, which is arbitrary, is `the_queue` and it is then assigned to `global q`. – Booboo Jun 22 '23 at 14:48
  • Where did you read that using a pool size of number of cores + 2 will force all cores to be used? This is news to me. As far as what the pool size *should* be, it depends on the nature of the work being done in `runcode` and I can't give a definite rule. If it were doing a lot of network requests, it might be profitable to use a pool size greater than the number of cores. Also, there is a distinction between physical cores and logical cores. My desktop has 8 logical cores but only 4 physical cores (half of `multiprocessing.cpu_count()`). If `runcode` is 100% CPU, 4 might be the best pool size. – Booboo Jun 22 '23 at 14:59
  • As far as leaving a core free for use by `listener`, it is mostly waiting for your process pool to produce another result by calling `q.get()` and then it does disk I/O putting the process again into a wait state until the I/O is complete. It might be wasteful to reduce your multiprocessing pool size by 1 for a process that is not doing much CPU. Try a pool size using all cores, logical and physical and then run again specifying a pool size equal to the number of physical cores only and compare. Then using the best value, try it again with a pool size one less and compare again. – Booboo Jun 22 '23 at 15:07
  • See [this](https://stackoverflow.com/questions/40217873/multiprocessing-use-only-the-physical-cores). – Booboo Jun 22 '23 at 15:14
  • The +2 might have been by some inexperienced guy in some old stackoverflow post. – Abilash Jun 23 '23 at 17:03
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/254220/discussion-between-abilash-and-booboo). – Abilash Jun 23 '23 at 17:04
  • When I try to go into chat it hangs loading the page. – Booboo Jun 23 '23 at 17:26