Ok, I'll expand on this as an answer.
The whole point of a multiprocessing.Pool
is for it to spawn a number of processes and then distribute the work over them in a first-free-first-tasked fashion. This means that if you have n
items to process and p
number of processes in your pool if will pick p
(or p * chunksize
if chunksize
is defined) number of items and send each of the items to a separate process for processing. Once a process finishes processing an item and is effectively freed, if there are still unprocessed items the pool will pick up the next item, send it to the freed process and so on and so on until there are no more items left. This ensures optimal utlization of your spawned processes without the need to manage the distribution yourself.
This also means that multiprocessing.Pool
is not suitable for every situation. In your case, based on the code presented, you want to split your iterable evenly over a fixed number processes, so the Pool would just be an overhead - there will be no more data to distribute once a process is finished. If you just want to split the data and send each chunk to a different process it's as simple as:
import multiprocessing
if __name__ == "__main__": # always guard your multiprocessing code
cores = max(multiprocessing.cpu_count() - 1, 1) # ensure at least one process
works = [(p, s, hit_rates) for p, s in sampled_patterns.items()]
chunk_size = (len(works) + cores - 1) // cores # rough chunk size estimate
processes = [] # a simple list to hold our process references
for i in range(cores):
work_set = works[i*chunk_size:(i+1)*chunk_size]
process = multiprocessing.Process(target=get_hit_rules, args=(work_set,))
process.start()
processes.append(process)
results = [process.join() for process in processes] # get the data back
This will do exactly what you've attempted to do - initiate a cpu_count()
of processes and send each a (roughly, the last process will get a bit less data on avg.) evenly sized chunk of your data in such a way that all of your data gets processed at once, in parallel.
Of course, if your data is too big as you've additionally clarified in the comment this will end up being unmanageable and then you can revert back to multiprocessing.Pool
to send manageable chunks of your data to the spawned processes to process in a row. Additionally, building up the works
list is then pointless, too - why would you want to build a list with billions of items that you already have the data in your sampled_patterns
dict?
Why not send individual items from your sampled_patterns
dict instead without building an intermediary list just so you can map it to the multiprocessing.Pool
? To do so, all you need is to create some sort of an iterator slicer and feed it to multiprocessing.Pool.imap
instead and let the pool manage the rest internally, so:
import multiprocessing
def patterns_slicer(patterns, size, hit_rates):
pos = 0 # store our current position
patterns = patterns.items() # use the items iterator
while pos < len(patterns):
yield [(p, s, hit_rates) for p, s in patterns[pos:pos+size]]
pos += size
if __name__ == "__main__": # always guard your multiprocessing code
cores = max(multiprocessing.cpu_count() - 1, 1) # ensure at least one process
pool = multiprocessing.Pool(processes=cores)
# lets use chunks of 100 patterns each
results = pool.imap(get_hit_rules, patterns_slicer(sampled_patterns, 100, hit_rates))
Of course, multiprocessing.Pool.imap
does a lot of lookahead so if your original data is too large, or you want to use huge chunks, you might want to consider implementing your own imap
with just-in-time data retrieval. Check this answer for an example.