I have a data series with a uniform distribution. I wish to exploit the distribution to sort the data in parallel. For N CPUs, I essentially define N buckets and sort the buckets in parallel. My problem is that, I do not get a speed up.
What is wrong?
from multiprocessing import Process, Queue
from numpy import array, linspace, arange, where, cumsum, zeros
from numpy.random import rand
from time import time
def my_sort(x,y):
y.put(x.get().argsort())
def my_par_sort(X,np):
p_list=[]
Xq = Queue()
Yq = Queue()
bmin = linspace(X.min(),X.max(),np+1) #bucket lower bounds
bmax = array(bmin); bmax[-1] = X.max()+1 #bucket upper bounds
B = []
Bsz = [0]
for i in range(np):
b = array([bmin[i] <= X, X < bmax[i+1]]).all(0)
B.append(where(b)[0])
Bsz.append(len(B[-1]))
Xq.put(X[b])
p = Process(target=my_sort, args=(Xq,Yq))
p.start()
p_list.append(p)
Bsz = cumsum(Bsz).tolist()
Y = zeros(len(X))
for i in range(np):
Y[arange(Bsz[i],Bsz[i+1])] = B[i][Yq.get()]
p_list[i].join()
return Y
if __name__ == '__main__':
num_el = 1e7
mydata = rand(num_el)
np = 4 #multiprocessing.cpu_count()
starttime = time()
I = my_par_sort(mydata,np)
print "Sorting %0.0e keys took %0.1fs using %0.0f processes" % (len(mydata),time()-starttime,np)
starttime = time()
I2 = mydata.argsort()
print "in serial it takes %0.1fs" % (time()-starttime)
print (I==I2).all()