These are my comments put into action. I hope your actual task is a more complicated computation or it would be hardly worth using multiprocessing.
import numpy as np
import multiprocessing
from functools import partial
from heapq import *
def get_generator(length: int):
for i in range(length):
yield [i, i + 1]
def some_func(x, other_stuff):
y = np.sum(x)
return y
def task(other_stuff, x: np.ndarray):
val = some_func(x, other_stuff)
return val
def main():
n = 20
generator = get_generator(n)
other_stuff = np.nan
func = partial(task, other_stuff)
cpu_count = multiprocessing.cpu_count() - 1 # leave a processor for the main process
chunk_size = n // cpu_count
HEAPSIZE = 8
with multiprocessing.Pool(cpu_count) as pool:
heap = []
for val in pool.imap_unordered(func, generator, chunksize=chunk_size):
if len(heap) < HEAPSIZE:
heappush(heap, val)
elif val > heap[0]:
heappushpop(heap, val)
# sort
values = sorted(heap, reverse=True)
print(values)
if __name__ == '__main__':
main()
Prints:
[39, 37, 35, 33, 31, 29, 27, 25]
Update
I found it best with the following experiment to allocate to the pool a number of processes equal to mp.cpu_count() - 1
to leave the main process a free proceesor to handle the results returned by the workers. I also experimented with the chunksize
parameter:
import multiprocessing as mp
import timeit
def worker_process(i):
s = 0
for n in range(10000):
s += i * i # square the argument
s /= 10000
return s
def main():
cpu_count = mp.cpu_count() - 1 # leave a processor for the main process
N = 10000
chunk_size = N // cpu_count # 100 may be good enough
results = []
with mp.Pool(cpu_count) as pool:
for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
results.append(result)
#print(results[0:10])
if __name__ == '__main__':
print(timeit.timeit(stmt='main()', number=10, globals=globals()) / 10)
On my desktop (running other processes, such as streaming music), the above code did better with assigning mp.cpu_count() - 1
to cpu_count
(2.4 seconds vs, 2.5 seconds). Here are other timings (rounded to one decimal place):
chunksize = 1428 -> 2.4 seconds (N // (mp.cpu_count() - 1)
chunksize = 1000 -> 2.7 seconds
chunksize = 100 -> 2.4 seconds
chunksize = 10 -> 2.4 seconds
chunksize = 1 -> 2.6 seconds
The result for a chunksize value of 1000 is a bit of an anomaly. I would suggest trying different values, otherwise N // (mp.cpu_count() - 1)
. This is assuming you can compute N
, the number of items in the iterable. When you have a generator as the iterable, you would have to, in the general case, convert it first to a list, to be able to get its length. Even a chunksize
value of 1 in this particular benchmark did not do that much worse. But this is what I have learned from varying the amount of work worker_process
has to do:
The more work (i.e. CPU) your worker process has to do to complete its task, the less sensitive it is to the chunksize
parameter. If it returns after using very little CPU, then the overhead of transferring the next chunk becomes significant and you want to keep the number of chunk transfers to a small value (i.e. you want a large chunksize
value). But if the process is long running, the overhead of transferring the next chunk will not be as impactful.
In the following code the worker process's CPU requirements are trivial:
import multiprocessing as mp
import timeit
def worker_process(i):
return i ** 2
def main():
cpu_count = mp.cpu_count() - 1
N = 100000
chunk_size = N // cpu_count
results = []
with mp.Pool(cpu_count) as pool:
for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
results.append(result)
print(results[0:10])
if __name__ == '__main__':
print(timeit.timeit(stmt='main()', number=10, globals=globals()) / 10)
The timings:
chunksize = 1428 -> .19 seconds
chunksize = 100 -> .39 seconds
chunksize = 1 -> 11.06 seconds
In the following code the worker process's CPU requirements are more substantial:
import multiprocessing as mp
import timeit
def worker_process(i):
s = 0
for _ in range(1000000):
s += i * i
return s // 1000000
def main():
cpu_count = mp.cpu_count() - 1
N = 1000
chunk_size = N // cpu_count
results = []
with mp.Pool(cpu_count) as pool:
for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
results.append(result)
print(results[0:10])
if __name__ == '__main__':
print(timeit.timeit(stmt='main()', number=3, globals=globals()) / 3)
The timings:
chunksize = 142 -> 22.6 seconds (N // (mp.cpu_count() - 1)
chunksize = 10 -> 23.5 seconds
chunksize = 1 -> 23.2 seconds
Update 2
According to Python multiprocessing: understanding logic behind chunksize
, when methods map
, starmap
or map_async
are called with chunksize=None
there is a specific algorithm used to compute a chunksize
, which I have used in the code below. I don't know why the default value for methods imap
and imap_unordered
is 1 and does not use this same algorithm. Perhaps because that wouldn't be "lazy" as implied by the description of these methods. In the following code, which repeats the previous benchmark, I use a redefinition of the same algorithm for computing the default chunksize
:
import multiprocessing as mp
import timeit
def worker_process(i):
s = 0
for _ in range(1000000):
s += i * i
return s // 1000000
def compute_chunksize(pool_size, iterable_size):
if iterable_size == 0:
return 0
chunksize, extra = divmod(iterable_size, pool_size * 4)
if extra:
chunksize += 1
return chunksize
def main():
cpu_count = mp.cpu_count() - 1
N = 1000
chunk_size = compute_chunksize(cpu_count, N)
print('chunk_size =', chunk_size)
results = []
with mp.Pool(cpu_count) as pool:
for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
results.append(result)
print(results[0:10])
if __name__ == '__main__':
print(timeit.timeit(stmt='main()', number=3, globals=globals()) / 3)
Timings:
chunksize 36 -> 22.2 seconds