You have coded:
with mp.Pool(8) as pool:
result = pool.map(foo, ((arr,) for arr in myarrays))
But the map
method will take any iterable (in this case a generator expression you created) for which it cannot determine the number of elements in the iterable and first convert it into a list, which has a __length__
method. So you save nothing by using a generator expression. You might as well have coded:
with mp.Pool(8) as pool:
result = pool.map(foo, myarrays)
If your memory is restricted, you may have to give up on the idea of working on 8 arrays in parallel. As has been mentioned in comments, creating the arrays in shared memory will by itself save memory. But let's look at the simpler case of using "regular" arrays. Ideally you would not want to create your 10 arrays at once but instead generate them one by one with a generator function. For example:
def generate_arrays():
for i in range(10):
# Generate the next array:
...
arr = result_of_previous_calculations
yield arr
del arr
Then after we do that we determine that we only have enough memory to process 4 arrays in parallel. So you create a pool size of 4 and use the apply_async
method instead submitting the first 4 tasks. But you specify with apply_async
call the callback and error_callback arguments specifying a function that will submit the next task, if any, as each task completes:
import multiprocessing
import time
from threading import Event
NUMBER_ARRAYS = 10
def generate_arrays():
for i in range(NUMBER_ARRAYS):
arr = [i] * i # for demo purposes
yield arr
del arr
def foo(arr):
import random
time.sleep(random.random() * 2.0) #simulate work
print(arr, flush=True)
return len(arr)
# No more than 4 processes running in parallel:
POOL_SIZE = 4
if __name__ == '__main__':
def my_callback(_):
# a task completed so submit the next task if any:
try:
if not all_tasks_submitted.is_set():
results.append(pool.apply_async(foo,
args=(next(iterator),),
callback=my_callback,
error_callback=my_callback
)
)
except StopIteration:
# No more tasks to submit:
all_tasks_submitted.set()
all_tasks_submitted = Event()
iterator = generate_arrays()
pool = multiprocessing.Pool(POOL_SIZE)
# submit the first POOL_SIZE tasks:
results = [
pool.apply_async(
foo,
args=(next(iterator),),
callback=my_callback,
error_callback=my_callback
)
for _ in range(POOL_SIZE)
]
# Wait for all tasks to have been submitted:
all_tasks_submitted.wait()
# Now wait for all tasks to complete.
pool.close()
pool.join()
# Return values from foo can be gotten from results
print('results:', [result.get() for result in results])
Prints:
[2, 2]
[1]
[3, 3, 3]
[]
[5, 5, 5, 5, 5]
[4, 4, 4, 4]
[7, 7, 7, 7, 7, 7, 7]
[6, 6, 6, 6, 6, 6]
[9, 9, 9, 9, 9, 9, 9, 9, 9]
[8, 8, 8, 8, 8, 8, 8, 8]
results: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]