4

I am trying to distribute all my jobs between 16 processors evenly using Pool. What I noticed is that initially 16 processes are spawned. After couple of seconds only 2 processes execute all the remaining job for a small number of jobs. No matter how much I increase load, there seems to be stead decrease no of processes working on it. Eventually, only 1 or 2 processes go through the remaining jobs.

Here is the multiprocessing snippet from my code.

c_size = len(sampled_patterns) / (cpu_count() -1)

pool = Pool(processes=cpu_count() -1)
works = [(pattern, support_set, hit_rates) for pattern,support_set in sampled_patterns.items()]
pool.starmap(get_hit_rules,works, chunksize=int(c_size))

Is there anyway to use all 16 processors to maximize parallelization? Thanks!

Edit! This is how the tasks are distributed. Counter with pid as keys and number of tasks as value.

Counter({30179: 14130, 30167: 13530, 30169: 12900, 30173: 12630, 30165: 12465, 30177: 12105, 30163: 11820, 30175: 11460, 30161: 10860, 30181: 10725, 30183: 9855, 30157: 8695, 30159: 6765, 30171: 4860, 30155: 1770})
Raja
  • 5,793
  • 1
  • 17
  • 20
  • `chunksize` is not doing what you think is doing - set it to the number processes you have in your pool (i.e. `pool._processes`) if you want to split the `works` iterable into evenly sized chunks over all of the processes in the pool. Tho, if you want to do that, the real question is why do you need a `Pool` anyway? – zwer Dec 07 '17 at 10:48
  • Thank you. This is my first code using multiprocessing. I am using Pool because the code looked less scary than spawning many Process. I thought Pool would take care of it for me. Just now reading on Pool vs Process Is there a better way? – Raja Dec 07 '17 at 10:57
  • I will have millions if not billions of items in `works`. So i think `Pool` is more suited than spawning as many `Process`. – Raja Dec 07 '17 at 11:11
  • Changing the chuck size did not help!! – Raja Dec 07 '17 at 12:05
  • [Python multiprocessing: understanding logic behind chunksize](https://stackoverflow.com/q/53751050/9059420) – Darkonaut Feb 21 '19 at 18:24

1 Answers1

3

Ok, I'll expand on this as an answer.

The whole point of a multiprocessing.Pool is for it to spawn a number of processes and then distribute the work over them in a first-free-first-tasked fashion. This means that if you have n items to process and p number of processes in your pool if will pick p (or p * chunksize if chunksize is defined) number of items and send each of the items to a separate process for processing. Once a process finishes processing an item and is effectively freed, if there are still unprocessed items the pool will pick up the next item, send it to the freed process and so on and so on until there are no more items left. This ensures optimal utlization of your spawned processes without the need to manage the distribution yourself.

This also means that multiprocessing.Pool is not suitable for every situation. In your case, based on the code presented, you want to split your iterable evenly over a fixed number processes, so the Pool would just be an overhead - there will be no more data to distribute once a process is finished. If you just want to split the data and send each chunk to a different process it's as simple as:

import multiprocessing

if __name__ == "__main__":  # always guard your multiprocessing code
    cores = max(multiprocessing.cpu_count() - 1, 1)  # ensure at least one process

    works = [(p, s, hit_rates) for p, s in sampled_patterns.items()]
    chunk_size = (len(works) + cores - 1) // cores  # rough chunk size estimate

    processes = []  # a simple list to hold our process references
    for i in range(cores):
        work_set = works[i*chunk_size:(i+1)*chunk_size]
        process = multiprocessing.Process(target=get_hit_rules, args=(work_set,))
        process.start()
        processes.append(process)

    results = [process.join() for process in processes]  # get the data back

This will do exactly what you've attempted to do - initiate a cpu_count() of processes and send each a (roughly, the last process will get a bit less data on avg.) evenly sized chunk of your data in such a way that all of your data gets processed at once, in parallel.

Of course, if your data is too big as you've additionally clarified in the comment this will end up being unmanageable and then you can revert back to multiprocessing.Pool to send manageable chunks of your data to the spawned processes to process in a row. Additionally, building up the works list is then pointless, too - why would you want to build a list with billions of items that you already have the data in your sampled_patterns dict?

Why not send individual items from your sampled_patterns dict instead without building an intermediary list just so you can map it to the multiprocessing.Pool? To do so, all you need is to create some sort of an iterator slicer and feed it to multiprocessing.Pool.imap instead and let the pool manage the rest internally, so:

import multiprocessing

def patterns_slicer(patterns, size, hit_rates):
    pos = 0  # store our current position
    patterns = patterns.items()  # use the items iterator
    while pos < len(patterns):
        yield [(p, s, hit_rates) for p, s in patterns[pos:pos+size]]
        pos += size

if __name__ == "__main__":  # always guard your multiprocessing code
    cores = max(multiprocessing.cpu_count() - 1, 1)  # ensure at least one process
    pool = multiprocessing.Pool(processes=cores)
    # lets use chunks of 100 patterns each
    results = pool.imap(get_hit_rules, patterns_slicer(sampled_patterns, 100, hit_rates))

Of course, multiprocessing.Pool.imap does a lot of lookahead so if your original data is too large, or you want to use huge chunks, you might want to consider implementing your own imap with just-in-time data retrieval. Check this answer for an example.

zwer
  • 24,943
  • 3
  • 48
  • 66
  • Thank you for your effort and time! Appreciate it. I haven't been able to get imap working with my code yet. One thing i probably should have mentioned is I am using `Manager().dict()` to update the results from every processes. If i pass my jobs as a generator to imap it works. But does nothing when I pass `iterator_slicer`. – Raja Dec 07 '17 at 22:16