1

I read a lot of posts about parallelization using the multiprocessing module but none of them quite answered my question.

I have a very long generator giving me parameter values and for each I want to compute some function value. However, I only want to save the best n many, since I am only interested in the best ones and saving all of the results would blow up the RAM. There way I see it, there are two ways to do this: 1) use a common shared memory between the processes where the best values are saved or 2) keep separate lists of the best results for each core/process and later manually merge these lists together.

I think the second method would be better, however I am not sure how to implement this. This is what I got so far:

import numpy as np
import multiprocessing
from functools import partial


def get_generator(length: int):
    for i in range(length):
        yield [i, i + 1]


def some_func(x, other_stuff):
    y = np.sum(x)
    return y


def task(other_stuff, x: np.ndarray):
    val = some_func(x, other_stuff)
    
    if val > task.some_dict['min']:
        task.l.append(val)
        task.some_dict['min'] = val
    return


def task_init(l, some_dict):
    task.l = l
    task.some_dict = some_dict
    task.some_dict['min'] = np.NINF

n = 20
generator = get_generator(n)
other_stuff = np.nan

func = partial(task, other_stuff)

l = multiprocessing.Manager().list()
some_dict = multiprocessing.Manager().dict()

p = multiprocessing.Pool(None, task_init, [l, some_dict])

p.imap(func, generator, chunksize=10000)

p.close()
p.join()

This would be somewhat similar to what I want to do. But I really care about performance and in the actual code the comparison/saving of the best values will be more complex so I think that the shared memory approach would be really slow.

My question boils down to: If I have e.g. 8 cores, how could I have 8 lists of the best results each for one core that will be returned, so that the cores work completely independent and rather quick?

Thank you very much!

user3666197
  • 1
  • 6
  • 50
  • 92
Laaag
  • 13
  • 2
  • 1
    You can use a heap/priority queue of size 'n'. Regarding having 8 different lists having the best n results, I would say its better to let Python handle that for you. You need to just define the task that you want to do. Then execute the tasks for different inputs on a multiprocess pool (Python will automatically have the defualt pool size equal to number of cores in your machine). Whatever result you get from a task, just insert that into a heap of size n. At the end of all task invocations, you will have the best n results in the heap. – Anmol Singh Jaggi Oct 19 '20 at 14:36
  • I just want to add to the comment made by @AnmolSinghJaggi. Python's heap queue implementation keeps the smallest value at the root (at index 0) rather than the largest and that is what you want. You will keep adding your results to the heap until you have a heap size of n. Thereafter you compare a new result with the smallest on the heap, which is at index 0. If the new result is not larger, you throw it away. Otherwise, you pop the smallest from the heap and throw it away and add the new result to maintain the heap size of n. – Booboo Oct 19 '20 at 20:23
  • 10,000 is a fairly large chunk size. The jobs will be doled out to each process in that size chunks. You have 8 processors. If you only had 30,000 jobs altogether, for example, 5 processors would never be given work at all to do and the other 3 processors would be dividing the 30,000 jobs among themselves. – Booboo Oct 19 '20 at 20:35
  • @Booboo thank you for the clarification. Is there some rule of thumb for a reasonable chunk size? I copied 10,000 from a different answer. All I know is that it should be bigger than 1. – Laaag Oct 19 '20 at 22:43
  • Thank you for the answers @AnmolSinghJaggi I will use the heap-approach then. I think, I can use `multiprocessing.Manager().list()` for the heap, right? – Laaag Oct 19 '20 at 22:54
  • @Laaag Well, I would think the upper bound would be the number of tasks divided by the number CPUs you have. Just a thought: Why have a managed dictionary and list at all? Have `task` return every `val` back to the main process . You can then iterate all the results returned by your call to `imap` (or better yet, `imap_unordered`) and build your heap as I outlined in a prior comment – Booboo Oct 19 '20 at 23:39
  • Agree with @Booboo – Anmol Singh Jaggi Oct 20 '20 at 06:03

1 Answers1

1

These are my comments put into action. I hope your actual task is a more complicated computation or it would be hardly worth using multiprocessing.

import numpy as np
import multiprocessing
from functools import partial
from heapq import *

def get_generator(length: int):
    for i in range(length):
        yield [i, i + 1]


def some_func(x, other_stuff):
    y = np.sum(x)
    return y


def task(other_stuff, x: np.ndarray):
    val = some_func(x, other_stuff)
    return val



def main():
    n = 20
    generator = get_generator(n)
    other_stuff = np.nan

    func = partial(task, other_stuff)

    cpu_count = multiprocessing.cpu_count() - 1 # leave a processor for the main process
    chunk_size = n // cpu_count
    HEAPSIZE = 8
    with multiprocessing.Pool(cpu_count) as pool:
        heap = []
        for val in pool.imap_unordered(func, generator, chunksize=chunk_size):
            if len(heap) < HEAPSIZE:
                heappush(heap, val)
            elif val > heap[0]:
                heappushpop(heap, val)
        # sort
        values = sorted(heap, reverse=True)
        print(values)


if __name__ == '__main__':
    main()

Prints:

[39, 37, 35, 33, 31, 29, 27, 25]

Update

I found it best with the following experiment to allocate to the pool a number of processes equal to mp.cpu_count() - 1 to leave the main process a free proceesor to handle the results returned by the workers. I also experimented with the chunksize parameter:

import multiprocessing as mp
import timeit

def worker_process(i):
    s = 0
    for n in range(10000):
        s += i * i # square the argument
    s /= 10000
    return s

def main():
    cpu_count = mp.cpu_count() - 1 # leave a processor for the main process
    N = 10000
    chunk_size = N // cpu_count # 100 may be good enough
    results = []
    with mp.Pool(cpu_count) as pool:
        for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
            results.append(result)
        #print(results[0:10])

if __name__ == '__main__':
    print(timeit.timeit(stmt='main()', number=10, globals=globals()) / 10)

On my desktop (running other processes, such as streaming music), the above code did better with assigning mp.cpu_count() - 1 to cpu_count (2.4 seconds vs, 2.5 seconds). Here are other timings (rounded to one decimal place):

chunksize = 1428 -> 2.4 seconds (N // (mp.cpu_count() - 1)
chunksize = 1000 -> 2.7 seconds
chunksize = 100 -> 2.4 seconds
chunksize = 10 -> 2.4 seconds
chunksize = 1 -> 2.6 seconds

The result for a chunksize value of 1000 is a bit of an anomaly. I would suggest trying different values, otherwise N // (mp.cpu_count() - 1). This is assuming you can compute N, the number of items in the iterable. When you have a generator as the iterable, you would have to, in the general case, convert it first to a list, to be able to get its length. Even a chunksize value of 1 in this particular benchmark did not do that much worse. But this is what I have learned from varying the amount of work worker_process has to do:

The more work (i.e. CPU) your worker process has to do to complete its task, the less sensitive it is to the chunksize parameter. If it returns after using very little CPU, then the overhead of transferring the next chunk becomes significant and you want to keep the number of chunk transfers to a small value (i.e. you want a large chunksize value). But if the process is long running, the overhead of transferring the next chunk will not be as impactful.

In the following code the worker process's CPU requirements are trivial:

import multiprocessing as mp
import timeit

def worker_process(i):
    return i ** 2

def main():
    cpu_count = mp.cpu_count() - 1
    N = 100000
    chunk_size = N // cpu_count
    results = []
    with mp.Pool(cpu_count) as pool:
        for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
            results.append(result)
        print(results[0:10])

if __name__ == '__main__':
    print(timeit.timeit(stmt='main()', number=10, globals=globals()) / 10)

The timings:

chunksize = 1428 -> .19 seconds
chunksize = 100 -> .39 seconds
chunksize = 1 -> 11.06 seconds

In the following code the worker process's CPU requirements are more substantial:

import multiprocessing as mp
import timeit

def worker_process(i):
    s = 0
    for _ in range(1000000):
        s += i * i
    return s // 1000000


def main():
    cpu_count = mp.cpu_count() - 1
    N = 1000
    chunk_size = N // cpu_count
    results = []
    with mp.Pool(cpu_count) as pool:
        for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
            results.append(result)
        print(results[0:10])

if __name__ == '__main__':
    print(timeit.timeit(stmt='main()', number=3, globals=globals()) / 3)

The timings:

chunksize = 142 -> 22.6 seconds (N // (mp.cpu_count() - 1)
chunksize = 10 -> 23.5 seconds
chunksize = 1 -> 23.2 seconds

Update 2

According to Python multiprocessing: understanding logic behind chunksize, when methods map, starmap or map_async are called with chunksize=None there is a specific algorithm used to compute a chunksize, which I have used in the code below. I don't know why the default value for methods imap and imap_unordered is 1 and does not use this same algorithm. Perhaps because that wouldn't be "lazy" as implied by the description of these methods. In the following code, which repeats the previous benchmark, I use a redefinition of the same algorithm for computing the default chunksize:

import multiprocessing as mp
import timeit

def worker_process(i):
    s = 0
    for _ in range(1000000):
        s += i * i
    return s // 1000000


def compute_chunksize(pool_size, iterable_size):
    if iterable_size == 0:
        return 0
    chunksize, extra = divmod(iterable_size, pool_size * 4)
    if extra:
        chunksize += 1
    return chunksize


def main():
    cpu_count = mp.cpu_count() - 1
    N = 1000
    chunk_size = compute_chunksize(cpu_count, N)
    print('chunk_size =', chunk_size)
    results = []
    with mp.Pool(cpu_count) as pool:
        for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
            results.append(result)
        print(results[0:10])

if __name__ == '__main__':
    print(timeit.timeit(stmt='main()', number=3, globals=globals()) / 3)

Timings:

chunksize 36 -> 22.2 seconds
Booboo
  • 38,656
  • 3
  • 37
  • 60
  • Thank you very much for your help! The actual tasks and data will be more complicated, hence names such as `some_func`, `other_stuff` and the useless generator, I just wanted to have some example :) – Laaag Oct 20 '20 at 00:41
  • I've updated the answer relating to the pool size and added some benchmarks relating to *chunksize*. – Booboo Oct 20 '20 at 12:59
  • Also, see https://stackoverflow.com/questions/53751050/python-multiprocessing-understanding-logic-behind-chunksize – Booboo Oct 20 '20 at 13:17