I wrote a python program to parallelize a bunch of matrix multiplications. The following code allot the entire calculation to four independent processes. (To simplify the demonstration, I deleted the code of collecting the results.) Since the matrix multiplications need to be repeated thousands of times, the four processes are kept alive and can be paused/resumed with pipes. The problem is that when I only resume one process, it will be finished very quickly (0.13s). However, the consumed time for each process increases when more processes are resumed. (0.31s for two, 0.45s for three, 0.6s for four). Since the four processes are totally independent, I really don't understand what is going on here.
import multiprocessing as mp
import numpy as np
import time
def worker(a,b,Ind):
st = time.time()
M = len(a)
a = np.reshape(a,[1,M])
NN = b.shape[1]
result = np.zeros([NN,len(Ind)])
# loop for delay points
for ii in range(0,len(Ind)):
temp = a.dot(b[Ind[ii]:Ind[ii]+M,:])
result[:,ii] = np.abs(temp)**2
print(f'Elapsed time {time.time()-st} for {len(Ind)} loops')
return result
def missions(a,b,Ind,Bob):
while True:
# wait until receive something
flag = Bob.recv()
if flag==1:
temp = worker(a,b,Ind)
Bob.send(1)
if __name__=='__main__':
N = 4
M = 160
procs = []
pipes = []
Ind0 = np.array(range(1600))*10
a = np.random.random(1998)
b = np.random.random((17988,700))+1j*np.random.random((17988,700))
for ii in range(N):
Ind_trace = np.array(range(0,M))+ii*M
Ind_trace.astype(int)
Ind = Ind0[Ind_trace]
# add pipes
Alice,Bob = mp.Pipe()
pipes.append({'Alice':Alice,'Bob':Bob})
# add process
procs.append(mp.Process(target=missions,args=(a,b,Ind,pipes[ii]['Bob'])))
for p in procs:
p.start()
# start process
tasks = [0,1,2,3]
#tasks = list([0])
# start tasks
for ii in tasks:
pipes[ii]['Alice'].send(1) # send signal to Bob to start mission
# check if all the tasks is finished
while(True):
if len(tasks)==0:
break
for ii in tasks:
if pipes[ii]['Alice'].poll():
if pipes[ii]['Alice'].recv()==1: #finished
tasks.remove(ii)
for p in procs:
p.terminate()
p.kill()