1

I have 10 long arrays, arr1, arr2, ..., arr10, and each gets pass to a function foo. Each arr is list of around 100 million floats. I'd like to use multiprocessing. Something like

import multiprocessing as mp

def foo(arr):
    # ...call external module to process arr...
    # return a float


myarrays = [arr1, ..., arr10]
with mp.Pool(8) as pool:
    result = pool.map(foo, ((arr,)  for arr in myarrays))

# I don't need myarrays anymore

for res in result:
    # ...do more...

The problem that I think I'm having (I may be wrong) is that each of the 8 processes seems to be consuming a large amount of memory, and the program seems to hang once the calls to foo are complete. Is there a better way to handle this?

theQman
  • 1,690
  • 6
  • 29
  • 52
  • 2
    Processes in Python make a copy of all of the data, each process has its own copy of each array. You can see this answer on how to share memory between processes https://stackoverflow.com/questions/7894791/use-numpy-array-in-shared-memory-for-multiprocessing – Tom McLean Mar 08 '23 at 17:00
  • Another approach is you can wrap them in a generator. That way, it will only spawn the value when actually need. – Minh-Long Luu Mar 08 '23 at 17:04
  • 1
    @TomMcLean does this mean I would need to create an `mp.Array` for each of my 10 array objects? – theQman Mar 08 '23 at 18:43
  • If you create the arrays in shared memory with `mp.Array`, then you will have one copy of each array instead of two. But you need to create each array efficiently. If you first create a "regular" array and use it to initialize the shared array, then you will be using more memory. If you have to do it that way, then create each shared array one at a time deleting the "regular" array versions as you go along. By the way, your code is not passing an array to `foo` but rather a `tuple` with a single element that is an array. Why not just `result = pool.map(foo, myarrays)`? – Booboo Mar 09 '23 at 21:04

2 Answers2

0

You have coded:

with mp.Pool(8) as pool:
    result = pool.map(foo, ((arr,)  for arr in myarrays))

But the map method will take any iterable (in this case a generator expression you created) for which it cannot determine the number of elements in the iterable and first convert it into a list, which has a __length__ method. So you save nothing by using a generator expression. You might as well have coded:

with mp.Pool(8) as pool:
    result = pool.map(foo, myarrays)

If your memory is restricted, you may have to give up on the idea of working on 8 arrays in parallel. As has been mentioned in comments, creating the arrays in shared memory will by itself save memory. But let's look at the simpler case of using "regular" arrays. Ideally you would not want to create your 10 arrays at once but instead generate them one by one with a generator function. For example:

def generate_arrays():
    for i in range(10):
        # Generate the next array:
        ...
        arr = result_of_previous_calculations
        yield arr
        del arr

Then after we do that we determine that we only have enough memory to process 4 arrays in parallel. So you create a pool size of 4 and use the apply_async method instead submitting the first 4 tasks. But you specify with apply_async call the callback and error_callback arguments specifying a function that will submit the next task, if any, as each task completes:

import multiprocessing
import time
from threading import Event

NUMBER_ARRAYS = 10

def generate_arrays():
    for i in range(NUMBER_ARRAYS):
        arr = [i] * i # for demo purposes
        yield arr
        del arr

def foo(arr):
    import random
    time.sleep(random.random() * 2.0) #simulate work
    print(arr, flush=True)
    return len(arr)

# No more than 4 processes running in parallel:
POOL_SIZE = 4

if __name__ == '__main__':
    def my_callback(_):
        # a task completed so submit the next task if any:
        try:
            if not all_tasks_submitted.is_set():
                results.append(pool.apply_async(foo,
                                                args=(next(iterator),),
                                                callback=my_callback,
                                                error_callback=my_callback
                                                )
                               )
        except StopIteration:
            # No more tasks to submit:
            all_tasks_submitted.set()

    all_tasks_submitted = Event()
    iterator = generate_arrays()
    pool = multiprocessing.Pool(POOL_SIZE)
    # submit the first POOL_SIZE tasks:
    results = [
        pool.apply_async(
            foo,
            args=(next(iterator),),
            callback=my_callback,
            error_callback=my_callback
            )
            for _ in range(POOL_SIZE)
    ]

    # Wait for all tasks to have been submitted:
    all_tasks_submitted.wait()

    # Now wait for all tasks to complete.
    pool.close()
    pool.join()
    # Return values from foo can be gotten from results
    print('results:', [result.get() for result in results])

Prints:

[2, 2]
[1]
[3, 3, 3]
[]
[5, 5, 5, 5, 5]
[4, 4, 4, 4]
[7, 7, 7, 7, 7, 7, 7]
[6, 6, 6, 6, 6, 6]
[9, 9, 9, 9, 9, 9, 9, 9, 9]
[8, 8, 8, 8, 8, 8, 8, 8]
results: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Booboo
  • 38,656
  • 3
  • 37
  • 60
-1

You can always restrict usage of memory by using this code for example

import resource
  
def limit_memory(maxsize):
    soft, hard = resource.getrlimit(resource.RLIMIT_AS)
    resource.setrlimit(resource.RLIMIT_AS, (maxsize, hard))

To know better what you are doing, you can look up info in this documentation

https://docs.python.org/3/library/resource.html