This should be my third and final question regarding my attempts to increase performance on some statistical analysis that I am doing with python. I have 2 versions of my code (single core vs multiprocessing), I was expecting to gain performance by using multiple cores as I expect my code to uncompress/unpack quite a few binary strings , sadly I noticed that the performance actually decreased by using multiple cores.
I am wondering if anyone has a possible explanation for what I observe (scroll down to the April 16th update for more information)?
The key part of program is the function numpy_array (+ decode in multiprocessing), code snippet below (full code accessible via pastebin, further below):
def numpy_array(data, peaks):
rt_counter=0
for x in peaks:
if rt_counter %(len(peaks)/20) == 0:
update_progress()
peak_counter=0
data_buff=base64.b64decode(x)
buff_size=len(data_buff)/4
unpack_format=">%dL" % buff_size
index=0
for y in struct.unpack(unpack_format,data_buff):
buff1=struct.pack("I",y)
buff2=struct.unpack("f",buff1)[0]
if (index % 2 == 0):
data[rt_counter][1][peak_counter][0]=float(buff2)
else:
data[rt_counter][1][peak_counter][1]=float(buff2)
peak_counter+=1
index+=1
rt_counter+=1
The multiprocessing version performs this with a set of functions, I will display the key 2 below:
def tonumpyarray(mp_arr):
return np.frombuffer(mp_arr.get_obj())
def numpy_array(shared_arr,peaks):
processors=mp.cpu_count()
with contextlib.closing(mp.Pool(processes=processors,
initializer=pool_init,
initargs=(shared_arr, ))) as pool:
chunk_size=int(len(peaks)/processors)
map_parameters=[]
for i in range(processors):
counter = i*chunk_size
chunk=peaks[i*chunk_size:(i+1)*chunk_size]
map_parameters.append((chunk, counter))
pool.map(decode,map_parameters)
def decode ((chunk, counter)):
data=tonumpyarray(shared_arr).view(
[('f0','<f4'), ('f1','<f4',(250000,2))])
for x in chunk:
peak_counter=0
data_buff=base64.b64decode(x)
buff_size=len(data_buff)/4
unpack_format=">%dL" % buff_size
index=0
for y in struct.unpack(unpack_format,data_buff):
buff1=struct.pack("I",y)
buff2=struct.unpack("f",buff1)[0]
#with shared_arr.get_lock():
if (index % 2 == 0):
data[counter][1][peak_counter][0]=float(buff2)
else:
data[counter][1][peak_counter][1]=float(buff2)
peak_counter+=1
index+=1
counter+=1
Full program codes can be accessed via these pastebin links
Pastebin for single core version
Pastebin for multiprocessing version
The performance that I am observing with a file containing 239 timepoints and ~ 180k measurement pairs per timepoint is ~2.5m for single core and ~3.5 for multiprocessing.
PS: The two previous questions (of my first ever attempts at paralellization):
-- April 16th --
I have been profiling my program with the cProfile library (having cProfile.run('main()')
in the __main__
, which shows that there is 1 step that is slowing everything down:
ncalls tottime percall cumtime percall filename:lineno(function)
23 85.859 3.733 85.859 3.733 {method 'acquire' of 'thread.lock' objects}
The thing that I do not understand here is that thread.lock
objects are used in threading
(to my understanding) but should not be used in multiprocessing as each core should run a single thread (besides having it's own locking mechanism), so how is it that this occurs and why does a single call take 3.7 seconds?