1

I have written a program which receives large-ish data sets as input (~150mb text files), does some math on them, and then reports the results in a histogram. The number of calculations that must be performed is proportional to the number of combinations of two points in the data set, which is extremely large (~5 billion) for a data set of 1 million points.

I was hoping to mitigate some of the computation time by using Python's multiprocessing module to distribute the calculation of partial histogram data to individual processes while keeping an array of the final histogram in shared memory so that each process can add to it.

I have created a working version of this program with multiprocessing generally based on the procedure described in this answer, however I found that it is actually marginally slower than the un-parallelized version I had previously written. I tried un-synchronizing access to the shared array, and found that this speeds things up significantly, but results in a loss of a portion of the data.

Here's a general outline of the code:

import numpy as np
from multiprocessing import Pool, Array

BINS = 200
DMAX = 3.5
DMIN = 0

def init(histo):
    global histo_shared
    histo_shared = histo

def to_np_array(mp_array):
    return np.frombuffer(mp_array.get_obj())

# synchronize access to shared array
def calc_sync(i):
    with histo_shared.get_lock():
        calc_histo(i)

def calc_histo(i):
    # create new array 'd_new' by doing some math on DATA using argument i
    histo = to_np_array(histo_shared)
    histo += np.histogram(d_new, bins=BINS,
        range=(DMIN, DMAX))[0].astype(np.int32)

def main():
    # read in data and calculate no. of iterations
    global DATA
    DATA = np.loadtxt("data.txt")
    it = len(DATA) // 2

    # create shared array 
    histo_shared = Array('l',  BINS)

    # write to shared array from different processes
    p = Pool(initializer=init, initargs=(histo_shared,))
        for i in range(1, it + 1):
            p.apply_async(calc_sync, [i])
    p.close()
    p.join()

    histo_final = to_np_array(histo_shared)
    np.savetxt("histo.txt", histo_final)

if __name__ == '__main__':
    main()

Is there something I'm missing here that's having a serious impact on my performance? Is there any way I can get around this issue to speed things up?

Any insights or suggestions are greatly appreciated!

Community
  • 1
  • 1
George D.
  • 255
  • 2
  • 10

1 Answers1

2

You are essentially locking out any parallelism you might be getting because there is a lock on your data the entire time you are processing.

When this method

def calc_sync(i):
    with histo_shared.get_lock():
        calc_histo(i)

is executing, you placed a lock on the entire shared dataset while you're processing the histogram. Notice also that

def calc_histo(i):
    # create new array 'd_new' by doing some math on DATA using argument i
    histo = to_np_array(histo_shared)
    histo += np.histogram(d_new, bins=BINS,
        range=(DMIN, DMAX))[0].astype(np.int32)

isn't doing anything with i, so it just looks like you're processing the same data over again. What is d_new? I don't see it in your listing.

Ideally, what you should be doing is taking your large dataset, slicing it in to some number of chunks and processing it individually and then combining the results. Only lock the shared data, not the processing steps. This might look something like this:

def calc_histo(slice):
    # process the slice asyncronously
    return np.histogram(slice, bins=BINS,
        range=(DMIN, DMAX))[0].astype(np.int32)

def calc_sync(start,stop):

    histo = None

    # grab a chunk of data, you likely don't need to lock this
    histo = raw_data[start:stop]

    # acutal calculation is async
    result = calc_histo(histo)

    with histo_shared.get_lock():
         histo_shared += result

For pairwise data:

def calc_sync(part1,part2):

    histo = None
    output = [] # or numpy array
    # acutal calculation is async
    for i in range(part1):
        for j in range(part2):
              # do whatever computation you need and add it to output

    result = calc_histo(output)

    with histo_shared.get_lock():
         histo_shared += result

And now

 p = Pool(initializer=init, initargs=(histo_shared,))
 for i in range(1, it + 1,slice_size):
     for j in range(1, it + 1,slice_size):
         p.apply_async(calc_sync, [histo_shared[j:j+slice_size], histo_shared[i:i+slice_size])

In words, we take pairwise cuts of the data, generate the relevant data and then put them in a histogram. The only real synch you need is when you're combining data in the histogram

dfb
  • 13,133
  • 2
  • 31
  • 52
  • Unfortunately I need the entire array for computing histogram data since it involves comparing each element in the array. i is used for determining how they are compared; sorry if I was unclear about that! Anyways, you were right about locking the processing steps being the issue. I adapted your solution into my code and it's working much faster now. Thank you! – George D. Oct 07 '15 at 21:26
  • I'm not really clear on how you want to parallelize it then. If you're making pairwise comparisons on elements in the array, then you could take two slices of the array, see my edits – dfb Oct 07 '15 at 21:31