1

I wrote a python program to parallelize a bunch of matrix multiplications. The following code allot the entire calculation to four independent processes. (To simplify the demonstration, I deleted the code of collecting the results.) Since the matrix multiplications need to be repeated thousands of times, the four processes are kept alive and can be paused/resumed with pipes. The problem is that when I only resume one process, it will be finished very quickly (0.13s). However, the consumed time for each process increases when more processes are resumed. (0.31s for two, 0.45s for three, 0.6s for four). Since the four processes are totally independent, I really don't understand what is going on here.

import multiprocessing as mp
import numpy as np
import time

def worker(a,b,Ind):
    st = time.time()
    M = len(a)
    a = np.reshape(a,[1,M])
    NN = b.shape[1]
    result = np.zeros([NN,len(Ind)])
    # loop for delay points
    for ii in range(0,len(Ind)):
        temp = a.dot(b[Ind[ii]:Ind[ii]+M,:])
        result[:,ii] = np.abs(temp)**2

    print(f'Elapsed time {time.time()-st} for {len(Ind)} loops')
    return result

def missions(a,b,Ind,Bob):
    while True:
        # wait until receive something
        flag = Bob.recv()
        if flag==1:
            temp = worker(a,b,Ind)
            Bob.send(1)


if __name__=='__main__':
    N = 4
    M = 160
    procs = []
    pipes = []
    Ind0 = np.array(range(1600))*10
    a = np.random.random(1998)
    b = np.random.random((17988,700))+1j*np.random.random((17988,700))
    for ii in range(N):
        Ind_trace = np.array(range(0,M))+ii*M
        Ind_trace.astype(int)
        Ind = Ind0[Ind_trace]
        # add pipes
        Alice,Bob = mp.Pipe()
        pipes.append({'Alice':Alice,'Bob':Bob})
        # add process
        procs.append(mp.Process(target=missions,args=(a,b,Ind,pipes[ii]['Bob'])))
    
    for p in procs:
        p.start()
    
    # start process
    tasks = [0,1,2,3]
    #tasks = list([0])
    # start tasks
    for ii in tasks:
        pipes[ii]['Alice'].send(1) # send signal to Bob to start mission

    # check if all the tasks is finished
    while(True):
        if len(tasks)==0:
            break
        for ii in tasks:
            if pipes[ii]['Alice'].poll():
                if pipes[ii]['Alice'].recv()==1:  #finished
                    tasks.remove(ii)

    for p in procs:
        p.terminate()
        p.kill()
General Grievance
  • 4,555
  • 31
  • 31
  • 45
Xiaowei
  • 11
  • 1
  • 1
    with such a small problem, you may just be seeing the overhead of starting new processes – ti7 Aug 06 '21 at 15:43
  • Since you're thinking of about performance, I would recommend using `Python Profiling` , so you have a leg to stand on in the future https://docs.python.org/3/library/profile.html , and this post to get you started: https://stackoverflow.com/questions/11041683/python-multiprocess-profiling – pyeR_biz Aug 06 '21 at 19:37
  • @ti7, I don't think it's an overhead issue, since all the processes have already been started at the beginning. They wait there until messages are sent through pipes. So the real calculation for each process is triggered by a pipe signal. Besides, the consumed time was measured inside the worker function, it took into account only the real calculation time. – Xiaowei Aug 07 '21 at 01:27
  • MP is only faster when the problem is cpu bound, there are available cpu cycles (e.g on other cores) which can be used, and the overhead of launching the processes **and communicating with them** is ouweighed by the gain. Some of these conditions are not applying. Numpy is pretty heavily optimised already: that might be the problem. – 2e0byo Jul 21 '22 at 16:08
  • Relevant discussion with a very similar problem https://stackoverflow.com/questions/72978588/diagnosing-python-multiprocessing-bottleneck – Charchit Agarwal Jul 21 '22 at 17:43

0 Answers0