-2

I use multiprocessing Pool to run parallel. I tried with 4 cores first in HPC with sub. When it uses 4 core, the time is reduced 4 times compared to 1 core. When I check with qstat, several times it uses 4 cores but after that just 1 core, with exactly the same code.

Could you please give some advice what is wrong with my code or the system?

import pandas as pd
import numpy as np
from multiprocessing import Pool
from datetime import datetime

t1 = pd.read_csv("template.csv",header=None)

s1 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_adfr.csv")
s2 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_dock.csv")
s3 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_gemdock.csv")
s4 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_ledock.csv")
s5 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_plants.csv")
s6 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_psovina.csv")
s7 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_quickvina2.csv")
s8 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_smina.csv")
s9 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_vina.csv")
s10 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_vinaxb.csv")

#number of core and arrays

n = 4
m = (len(t1) // n)+1
g= m*n - len(t1)
for g1 in range(g):
    t1.loc[len(t1)]=0


results=[]

def block_linear(i):

    temp = pd.DataFrame(np.zeros((m,29)))

    for a in range(0,m):
        sum_matrix = (t1.iloc[a,0]*s1) + (t1.iloc[a,1]*s2) + (t1.iloc[a,2]*s3)+ (t1.iloc[a,3]*s4) + (t1.iloc[a,4]*s5) + (t1.iloc[a,5]*s6) + (t1.iloc[a,6]*s7) + (t1.iloc[a,7]*s8) + (t1.iloc[a,8]*s9) + (t1.iloc[a,9]*s10)
        rank_sum= pd.DataFrame.rank(sum_matrix,axis=0,ascending=True,method='min') #real-True
        temp.iloc[a,:] = rank_sum.iloc[999].values
    temp['median'] = temp.median(axis=1)
    temp.index = range(i*m,(i+1)*m)
    return temp

start=datetime.now()

if __name__ == '__main__':
    pool = Pool(processes=n)
    results = pool.map(block_linear,range(0,n))

print(datetime.now()-start)

out=pd.concat(results)
out.drop(out.tail(g).index,inplace=True)
out.to_csv('test_10dock_4core.csv',index=False)

The main idea is to cut large table into smallers, run calculations and combine together.

pughon
  • 19
  • 5

1 Answers1

1

Without a more detailed usage of the multiprocessing's Pool package is really difficult to understand and help. Please notice that the Pool package does not guarantee parallelization: the _apply function, for example, only uses one worker of the Pool, and block all your executions. You can check out more details about it here and there.

But assuming you are using the library properly, you should make sure your code is fully parallelizable: an I/O operation on disk, for example, can bottleneck your parallelization and thus making your code run in only one process at a time.

I hope it helped.


[Edit] Since you provided more details about your problem, I can give more specific tips:

The first thing is that your code is zero parallel. You are just calling the same function N times. This is not how multiprocessing should work. Instead, the part that should be parallel is the one that is usually in a for loops, like the one you have inside the block_linear().

So, what I recommend to you:

You should change your code to first calculate all your weighted sum and only after that do the rest of the operations. This will help a lot with parallelization. So, put this operation in a function:

  def weighted_sum(column,df2):
    temp = pd.DataFrame(np.zeros(m))
    for a in range(0,m):
        result = (t1.iloc[a,column]*df2)
        temp.iloc[a] = result
    return temp

So then, you use pool.starmap to parallel the function for the 10 dataframes you have, something like this:

results = pool.starmap(weighted_sum,[(0,s1),(1,s2),(2,s3),....,[9,s10]])

ps: pool.starmap is similar to pool.map but accepts a list of tuple arguments. You can have more details about it here.

At last but not least, you should operate over your results to end your calculations. Since you will have one weighted_sum per column, you can apply a sum over the columns and then the rank_sum.

This is not a fully runnable code to solve your problem, but a general guide of how your should restructure your code to have a multiprocessing advantage. I recommend you to test it over a subsample of the data frames just to make sure it's working properly before you run it on all your data.