I have written a program which receives large-ish data sets as input (~150mb text files), does some math on them, and then reports the results in a histogram. The number of calculations that must be performed is proportional to the number of combinations of two points in the data set, which is extremely large (~5 billion) for a data set of 1 million points.
I was hoping to mitigate some of the computation time by using Python's multiprocessing
module to distribute the calculation of partial histogram data to individual processes while keeping an array of the final histogram in shared memory so that each process can add to it.
I have created a working version of this program with multiprocessing
generally based on the procedure described in this answer, however I found that it is actually marginally slower than the un-parallelized version I had previously written. I tried un-synchronizing access to the shared array, and found that this speeds things up significantly, but results in a loss of a portion of the data.
Here's a general outline of the code:
import numpy as np
from multiprocessing import Pool, Array
BINS = 200
DMAX = 3.5
DMIN = 0
def init(histo):
global histo_shared
histo_shared = histo
def to_np_array(mp_array):
return np.frombuffer(mp_array.get_obj())
# synchronize access to shared array
def calc_sync(i):
with histo_shared.get_lock():
calc_histo(i)
def calc_histo(i):
# create new array 'd_new' by doing some math on DATA using argument i
histo = to_np_array(histo_shared)
histo += np.histogram(d_new, bins=BINS,
range=(DMIN, DMAX))[0].astype(np.int32)
def main():
# read in data and calculate no. of iterations
global DATA
DATA = np.loadtxt("data.txt")
it = len(DATA) // 2
# create shared array
histo_shared = Array('l', BINS)
# write to shared array from different processes
p = Pool(initializer=init, initargs=(histo_shared,))
for i in range(1, it + 1):
p.apply_async(calc_sync, [i])
p.close()
p.join()
histo_final = to_np_array(histo_shared)
np.savetxt("histo.txt", histo_final)
if __name__ == '__main__':
main()
Is there something I'm missing here that's having a serious impact on my performance? Is there any way I can get around this issue to speed things up?
Any insights or suggestions are greatly appreciated!