1

I am trying to parallelize my code across multiple processes. The computation is mostly CPU intensive, so I decided to go for multiprocessing rather than multithreading. I am given a matrix (~300/400 elements) and a vector (~10 elements).

First, I use the pool of processes to perform i distinct operations on a copy the original matrix/vector, obtaining as result i distinct new matrix/vector pairs. In the process, I also check if a selection of columns is equal to another given matrix.

Then, for each pair obtained before, I perform other operations to check that the vector resulting from the bitwise sum of some columns of the matrix and the vector has a given weight. This last part is repeated x times, where x dictates how many columns of the matrix should I extract.

The main problem is that the computation at some point stops without any error message. For this reason, I don't know how to debug the issue at all.

Since numpy is already multithreading, to avoid exhausting my hardware, I also set all the thread numbers to 1. However, even if I have 96 cores, spawning a pool of 48 workers still sometimes bring all of them to 100% usage.

I have tried to simplify the code to a minimum example to just show the idea. What can be the problem related to this code? Also, do you think this is the right approach to the problem?

import operator
import os
from multiprocessing import Pool

import numpy as np

ENVIRONMENT = ("OMP_NUM_THREADS", "MKL_NUM_THREADS", "OPENBLAS_NUM_THREADS",
               "OPENBLAS_NUM_THREADS", "VECLIB_MAXIMUM_THREADS",
               "NUMEXPR_NUM_THREADS")


def _prepare_environment():
    """This is necessary since numpy already uses a lot of threads. See
https://stackoverflow.com/a/58195413/2326627"""
    print("Setting threads to 1")
    for env in ENVIRONMENT:
        os.environ[env] = "1"


def _op1(matrix, vector, another_matrix, params):
    matrix2 = vector.copy()
    vector2 = vector.copy()
    # ... conditional additions of matrix2's rows and vector2 based on params...

    # Check if some columns of the matrix are equal to the other matrix given
    is_eq = np.array_equal(matrix2[:, params['range']], another_matrix)
    return (matrix2, vector2, is_eq)


def _op2(matrix, is_eq, vector, params):
    # Check bitwise sum of some columns of matrix and vector
    sum_cols_v = (matrix[:, params['range2']].sum(axis=1) + vector) % 2
    sum_cols_v_w = np.sum(sum_cols_v)
    # Check if the weight is equal to the given one
    is_correct_w = sum_cols_v_w == params['w']
    return (is_eq, is_correct_w)


def go(matrix, vector, pool):
    op1_ress = []
    for i in range(1000):  # number depends on other params, not interesting
        params = {}
        params['i'] = i
        # ...create other params...
        other_matrix = None
        # ...generate other matrix...
        res = pool.apply_async(_op1, (matrix, vector, other_matrix, params))
        op1_ress.append(res)
    # At this point we have all the results for all possible RREF
    print('op1 done')

    # We want to count the numer of matrix equal to another matrix
    n_idens = sum(i for _, _, i in map(operator.methodcaller('get'), op1_ress))
    print(n_idens)

    for x in range(5):  # number depends on other params, not interesting
        op2_ress = []
        for (matrix2, vector2, is_eq) in map(operator.methodcaller('get'),
                                             op1_ress):
            params2 = {}
            params2['x'] = x
            # ... create other params ...
            for y in range(1000):  # number depends on other params, not interesting
                params2['y'] = y
                res = pool.apply_async(_op2,
                                       (matrix2, is_eq, vector2, params2))
                op2_ress.append(res)

        n_weights = 0
        n_weights_given_eq = 0

        for is_eq, is_correct_w in map(operator.methodcaller('get'), op2_ress):
            if is_correct_w:
                n_weights += 1
                if is_eq:
                    n_weights_given_eq += 1
        print(n_weights)
        print(n_weights_given_eq)


def main():
    _prepare_environment()

    pool = Pool(48)
    rng = np.random.default_rng()
    matrix = rng.integers(2, size=(12, 28))
    vector = rng.integers(2, size=(12, 1))
    go(matrix, vector, pool)
    pool.close()
    pool.join()


if __name__ == '__main__':
    main()
tigerjack
  • 1,158
  • 3
  • 21
  • 39
  • Do you multiply or diagonalize matrices? Most `numpy` operations are single threaded. I would try to use multithreading first. – azelcer Feb 08 '22 at 21:25
  • @azelcer I was under the impression that GIL was preventing me from running multiple threads at once. – tigerjack Feb 09 '22 at 11:48

0 Answers0