2

I have a dictionary of 15 variables, each with 3 values, for which I need to generate a product of all possible combinations (3**15 = 14.3M combinations). I'm using multi-threading with a 12 core processor to process the combinations (likely jumping to 64 cores).

I'm using itertools.product to generate the different combinations, and ThreadPool with imap_unordered to run the multiprocessing. Additionally, I'm using deque to remove the result as soon as it's available. However, I'm finding that the memory consumption is blowing up to about 2.5GB. I understand that the itertools.product is an iterable and therefore should not be storing much data in memory, but that doesn't seem to be the case.

Below is my code, and I'm wondering if anyone can help me figure out how I can better optimize the memory utilization.

Additionally, I'm wondering how the chunk size in the imap_unordered plays a role in memory efficiency. I was trying different numbers to see how it effects memory usage (including 10, 100, 1000, 10000) but it doesn't seem to impact much other than stabilizing the memory utilization at around 2.5GB. If I don't include the chunk size, memory tends to blow up >5GB.

I also tried changing the number of threads from 12 to 1, and that also did not impact the memory usage. However, using the single-processor implementation (commented out below) reduces the memory usage to only ~30MB.

import numpy as np
import itertools
import multiprocessing
import queue
import functools
from multiprocessing import pool, dummy

def dummy_func(values, keys):
    print( dict(zip(keys, values)) )
    return

def main():
    num_threads = multiprocessing.cpu_count()

    parameters = {'a': ['7.0', '9.0', '11.0'], 'b': ['125p', '200p', '275p'], 
                  'c': ['320n', '440n', '560n'], 'd': ['400p', '500p', '600p'], 
                  'e': ['262p', '374p', '486p'], 'f': ['13p', '25p', '37p'], 
                  'g': ['19p', '40p', '61p'], 'h': ['7p', '16p', '22p'], 
                  'i': ['7p', '16p', '22p'], 
                  'j': ['0.7200000000000004', '1.1500000000000008', '1.5700000000000012'], 
                  'k': ['4', '8', '11'], 'l': ['41', '77', '113'], 'm': ['4', '8', '11'], 
                  'n': ['16p', '31p', '46p'], 'o': ['20n', '30n', '35n']}
    keys = list(parameters)

    # process simulations for all permutations using single process
    #for values in itertools.product(*map(parameters.get, keys)):
    #    dummy_func(values, keys)

    # process simulations for all permutations using multi-threading
    with multiprocessing.pool.ThreadPool(num_threads) as workers:
        queue.deque(workers.imap_unordered(functools.partial(dummy_func, keys=keys), 
                                           itertools.product(*map(parameters.get, keys)), 100))
    return

if __name__ == "__main__":
    main()
arnt
  • 8,949
  • 5
  • 24
  • 32
David
  • 61
  • 2
  • Why are you materializing all the results in a deque, anyway? That creates the memory problem you're trying to avoid. It sounds like you may have misunderstood the concept of a deque. – user2357112 Aug 30 '21 at 01:37
  • 1
    Maybe you saw `collections.deque(iterator, maxlen=0)` used to consume an iterator somewhere and didn't realize the role of the `maxlen` argument. – user2357112 Aug 30 '21 at 01:39
  • Here's some info on what [chunksize](https://stackoverflow.com/questions/3822512/chunksize-parameter-in-multiprocessing-pool-map) does. – martineau Aug 30 '21 at 01:44
  • Your worker function, `dummy_func`, is returning `None` and that is what you are adding to your deque over and over again and it will consequently grow very large. What is the point of this? Anyway, try specifying a reasonable *maxlen* argument to the deque constructor to keep the size limited to `maxlen` elements. – Booboo Aug 30 '21 at 10:38
  • @martineau `chunksize` effects speed rather than space. – Booboo Aug 30 '21 at 10:58
  • @Booboo: I never said what it did. The OP was wondering about it so I provided a link to something that describes what it does. Personally I *think* it might affect space because it determines the approximate size of the pieces that the iterable will be split up into and submitted to each task. Bigger pieces → more memory. – martineau Aug 30 '21 at 11:30
  • `imap_unordered` saves its results to a deque internally as it goes, and does not wait for them to be consumed before computing more. Calling `queue.deque` similarly tries to collect all the results at once making the call equivalent to `map` anyway. These results may be just `None`, but if you have millions of pointers to `None`, it could still add up. – Aaron Aug 30 '21 at 18:49
  • Thanks to everyone for the feedback. I've tried these suggestions but to no avail. Below I've tried to consume the iterables while also filtering None and limiting the dequeue size, but I'm still seeing memory usage blow up. Any recommendations? `with multiprocessing.pool.ThreadPool(num_threads) as workers: collections.deque(filter(None, workers.imap_unordered(functools.partial(process_simulation, keys=keys), itertools.product(*map(parameters.get, keys)), chunksize=num_threads)), maxlen=0)` – David Aug 31 '21 at 02:15

1 Answers1

1

Update 2

I have of late learned that if you are using methods multiprocessing.pool.Pool.imap or multiprocessing.pool.Pool.imap_unordered then there is no need to use the special BoundedQueueProcessPool class I developed to prevent the submission of tasks to the processing pool's task queue faster than the pools' processes can run them resulting in memory exploding. These two methods iterate the passed iterable only to prevent the pool processes from remaining order, so task submission is already throttled. However, if you were, for example, submitting tasks in a loop with the multiprocessing.pool.Pool.apply_async method, the task queue could grow extremely large without the throttling that BoundedQueueProcessPool provides. So to simplify the code we will be using standard classes.

If you do not want to blow up memory you need 2 things:

  1. You need to have an iterable that is generating your values being passed to dummy_func that generates the values incrementally. itertools.product actually generates all the values in memory before yielding the first value, so it will blow up memory regardless of anything else you do.
  2. You must use a function that processes the iterable one by one and for each result appends the result to the deque initialized with a suitable non-zero maxlen argument. Your current code is initializing the deque with the complete output of the map function, which will have the length of the passed iterable. This will blow memory.

To overcome the problem described in 1. I am using the permutations generator function.

To overcome the problem described in 2. I have initialized an empty deque with maxlen=10. As each value is returned from dumy_func I will append it to the deque.

import multiprocessing
from functools import partial
import queue
from itertools import permutations

def dummy_func(values, keys):
    #print( dict(zip(keys, values)))
    ...
    return dict(zip(keys, values))

def main():
    num_threads = multiprocessing.cpu_count()

    parameters = {'a': ['7.0', '9.0', '11.0'], 'b': ['125p', '200p', '275p'],
                  'c': ['320n', '440n', '560n'], 'd': ['400p', '500p', '600p'],
                  'e': ['262p', '374p', '486p'], 'f': ['13p', '25p', '37p'],
                  'g': ['19p', '40p', '61p'], 'h': ['7p', '16p', '22p'],
                  'i': ['7p', '16p', '22p'],
                  'j': ['0.7200000000000004', '1.1500000000000008', '1.5700000000000012'],
                  'k': ['4', '8', '11'], 'l': ['41', '77', '113'], 'm': ['4', '8', '11'],
                  'n': ['16p', '31p', '46p'], 'o': ['20n', '30n', '35n']
                  }

    # A more reasonably sized parameters:
    parameters = {'a': ['7.0', '9.0', '11.0'], 'b': ['125p', '200p', '275p'],
                  'c': ['320n', '440n', '560n'], 'd': ['400p', '500p', '600p'],
                  'e': ['262p', '374p', '486p'], 'f': ['13p', '25p', '37p'],
                  'g': ['19p', '40p', '61p'], 'h': ['7p', '16p', '22p'],
                  'i': ['7p', '16p', '22p'],
                  'j': ['0.7200000000000004', '1.1500000000000008', '1.5700000000000012'],
                  }


    keys = list(parameters)

    # process simulations for all permutations using single process
    #for values in itertools.product(*map(parameters.get, keys)):
    #    dummy_func(values, keys)

    q = queue.deque(maxlen=10)

    pool = multiprocessing.Pool(num_threads)
    for v in pool.imap(partial(dummy_func, keys=keys), permutations(parameters.values(), len(keys))):
        q.append(v)
    return q

if __name__ == '__main__':
    import time
    t = time.time()
    q = main()
    print(q)
    print(time.time() - t)
Booboo
  • 38,656
  • 3
  • 37
  • 60
  • thanks for the detailed response. However, to be clear, the intention of the dummy _func is to perform some arbitrary task on the data. The intention is not to return or retain the data to print later. That would require the data to be stored. I've also tried various maxlen settings for dequeue (0,1,10,100,1000) but those did not affect memory or performance. – David Sep 01 '21 at 15:42
  • Your original function implicitly returned `None` (I am not sure why you even had the unnecessary `return` statement at all for that). Then tell me what the purpose was of blowing up memory by having a deque that seemingly (to me) stored semantically meaningless values. I have been giving you the benefit of the doubt that there has been a purpose for it and just assumed that through some possible misunderstanding or absentmindedness on your part you forgot to return a result. – Booboo Sep 01 '21 at 16:55
  • The reason why *maxlen* values you set would not affect memory is because the deque will not get constructed until the `itertools.product` method yielded all its values and that would in itself take up a lot of memory. In my solution above the *intent* at least in using `imap` is to do *lazy* evaluation of `itertools.product` and to add elements to the deque one by one as they become available. However, it turns out that `itertools.product` essentially builds all of the results first in memory before yielding the values one by one and that's why even using `imap` memory explodes. – Booboo Sep 01 '21 at 17:28