I am trying to use Python's multiprocessing module to speed up processing. However, when I have run the test_parallel_compute function at the very bottom of the code, on a computing cluster with 32 nodes (EDIT I've found out that I'm only running across one node), the time for the program to run without multiprocessing is longer: 1024 seconds (32 processes) vs 231 seconds (no multiprocessing module used). 1022 of the seconds were spent in the pool.map call within the parallel_compute_new_2 function, so the time is not limited by partitioning the inputs nor by joining the return functions.
I have an input list (b) and several other arguments (a and c) to the function (test_function). In order to prepare these for the multiple processors, I partition b. I then give the function and its partitioned arguments as arguments to worker_function_new, which calls the test_function on its partitioned arguments.
QUESTION EDITTED: Can you see any inefficiencies in mapping the multiple processes as below? Again, 1022 of the seconds were spent in the pool.map call within the parallel_compute_new_2 function, so the time is not limited by partitioning the inputs nor by joining the return functions.
I am calling running this with inputs of a = 100.0, b = range(10000000), and c = 15.0.
Thank you!
# Partition the input list
def partition_inputs(input, number):
num_inputs = len(input)
return [input[num_inputs * i/number:num_inputs * (i+1)/number] for i in range(number)]
# This is the function that each process is supposed to run.
# It takes in several arguments. b is a long list, which is partitioned
# into multiple slices for each process. a and c are just other numbers.
# This function's return values, d, e, and f, are joined between each process.
def test_function_2(args):
a = args[0]
b = args[1]
c = args[2]
d = []
e = 0
f = {}
for i in b:
d.append(a*i*c)
f[i] = set([a, i, c])
return d, e, f
def parallel_compute_new_2(function, args, input, input_index, partition_input_function, join_functions_dict, pool,
number=32, procnumber=32):
# Partition the b list. In my case, the partition_input_function is
# partition_input_list, as above.
new_inputs = partition_input_function(input, number)
# Since test_function_2 requires arguments (a, c) beyond the partitioned
# list b, create a list of the complete arguments.
worker_function_args_list = []
for i in range(number):
new_args = args[:]
new_args[input_index] = new_inputs[i]
worker_function_args_list.append(new_args)
returnlist = pool.map(function, worker_function_args_list)
# Join the return values from each process.
return_values = list(returnlist[0])
for index in join_functions_dict:
for proc in range(1, number):
return_values[index] = join_functions_dict[index](return_values[index], returnlist[proc][index])
return return_values
def test_parallel_compute(a, b, c, number=32, procnumber=32):
join_functions_dict = {}
join_functions_dict[0] = lambda a, b: a + b
join_functions_dict[2] = combine_dictionaries
# a = 100.
# b = range(1000000000)
# c = 15.
d, e, f = test_function(a, b, c)
pool = mup.Pool(processes=procnumber)
d1, e1, f1 = parallel_compute_new_2(test_function_2, [a, b, c], b, 1, partition_inputs, join_functions_dict, pool, number=number, procnumber=procnumber)