I am trying to parallelize my code across multiple processes. The computation is mostly CPU intensive, so I decided to go for multiprocessing rather than multithreading. I am given a matrix (~300/400 elements) and a vector (~10 elements).
First, I use the pool of processes to perform i
distinct operations on a copy the original matrix/vector, obtaining as result i
distinct new matrix/vector pairs. In the process, I also check if a selection of columns is equal to another given matrix.
Then, for each pair obtained before, I perform other operations to check that the vector resulting from the bitwise sum of some columns of the matrix and the vector has a given weight. This last part is repeated x
times, where x
dictates how many columns of the matrix should I extract.
The main problem is that the computation at some point stops without any error message. For this reason, I don't know how to debug the issue at all.
Since numpy
is already multithreading, to avoid exhausting my hardware, I also set all the thread numbers to 1. However, even if I have 96 cores, spawning a pool of 48 workers still sometimes bring all of them to 100% usage.
I have tried to simplify the code to a minimum example to just show the idea. What can be the problem related to this code? Also, do you think this is the right approach to the problem?
import operator
import os
from multiprocessing import Pool
import numpy as np
ENVIRONMENT = ("OMP_NUM_THREADS", "MKL_NUM_THREADS", "OPENBLAS_NUM_THREADS",
"OPENBLAS_NUM_THREADS", "VECLIB_MAXIMUM_THREADS",
"NUMEXPR_NUM_THREADS")
def _prepare_environment():
"""This is necessary since numpy already uses a lot of threads. See
https://stackoverflow.com/a/58195413/2326627"""
print("Setting threads to 1")
for env in ENVIRONMENT:
os.environ[env] = "1"
def _op1(matrix, vector, another_matrix, params):
matrix2 = vector.copy()
vector2 = vector.copy()
# ... conditional additions of matrix2's rows and vector2 based on params...
# Check if some columns of the matrix are equal to the other matrix given
is_eq = np.array_equal(matrix2[:, params['range']], another_matrix)
return (matrix2, vector2, is_eq)
def _op2(matrix, is_eq, vector, params):
# Check bitwise sum of some columns of matrix and vector
sum_cols_v = (matrix[:, params['range2']].sum(axis=1) + vector) % 2
sum_cols_v_w = np.sum(sum_cols_v)
# Check if the weight is equal to the given one
is_correct_w = sum_cols_v_w == params['w']
return (is_eq, is_correct_w)
def go(matrix, vector, pool):
op1_ress = []
for i in range(1000): # number depends on other params, not interesting
params = {}
params['i'] = i
# ...create other params...
other_matrix = None
# ...generate other matrix...
res = pool.apply_async(_op1, (matrix, vector, other_matrix, params))
op1_ress.append(res)
# At this point we have all the results for all possible RREF
print('op1 done')
# We want to count the numer of matrix equal to another matrix
n_idens = sum(i for _, _, i in map(operator.methodcaller('get'), op1_ress))
print(n_idens)
for x in range(5): # number depends on other params, not interesting
op2_ress = []
for (matrix2, vector2, is_eq) in map(operator.methodcaller('get'),
op1_ress):
params2 = {}
params2['x'] = x
# ... create other params ...
for y in range(1000): # number depends on other params, not interesting
params2['y'] = y
res = pool.apply_async(_op2,
(matrix2, is_eq, vector2, params2))
op2_ress.append(res)
n_weights = 0
n_weights_given_eq = 0
for is_eq, is_correct_w in map(operator.methodcaller('get'), op2_ress):
if is_correct_w:
n_weights += 1
if is_eq:
n_weights_given_eq += 1
print(n_weights)
print(n_weights_given_eq)
def main():
_prepare_environment()
pool = Pool(48)
rng = np.random.default_rng()
matrix = rng.integers(2, size=(12, 28))
vector = rng.integers(2, size=(12, 1))
go(matrix, vector, pool)
pool.close()
pool.join()
if __name__ == '__main__':
main()