0

I am multi-processing my python script with apply_async() as below:

def my_proc(df, id):
   # do something
   return df

df = pd.read_csv(myfile, sep='\t', header=0, dtype=object)
p = multiprocessing.Pool(50)
ids = df['id'].tolist()
for i in range(len(ids))
    result[id] = p.apply_async(my_proc, [df, ids[i]])

The problem I am encountering is if dataframe size gets very large (200K rows with 75 columns), at any given time only one process runs while all others are blocked in sleep mode.

If I save the dataframe into a csv file and pass the csv filename as a parameter and have the processes open and read the csv, I see that now all processes stay running but the performance becomes unacceptable as all processes (50 of them) compete to open the same large csv file.

Anyone can tell me how can I find out why and where are these processes blocked. Any suggestions for an alternative performant workaround?

Edits:

I am using a Linux server. I tried to pass the df in a queue like below, but same result. I also return a None and changed my process count to 3 to isolate the problem:

def my_proc(q, id):
    df = q.get()
    # do something
    return **None**

p = multiprocessing.Pool(**3**)
m = multiprocessing.Manager()
q = m.Queue()
df = pd.read_csv(report_file_dups, sep='\t', header=0, dtype=object)
q.put(df)
ids = df['id'].tolist()
for i in range(len(ids))
    result[id] = p.apply_async(my_proc, [q, ids[i]])

Am I using the queue as it was intended?

dmornad
  • 141
  • 1
  • 11
  • Are you on Windows? if you are you need to put your main code in a `if __name__=="__main__":` block. Otherwise multiprocessing won't work correctly. – Dan D. Dec 22 '18 at 22:39
  • have you tried using `map` with a `chunksize` set? looks a bit like a question I responded to recently: https://stackoverflow.com/a/53797655/1358308 – Sam Mason Dec 22 '18 at 22:42
  • it might also just be related to sending so much data around the place. the parameters for each call get "pickled" independently, and the results are picked before sending back. i.e. you probably want to do something so that you're not pickling a large dataframe with every call – Sam Mason Dec 22 '18 at 22:48

1 Answers1

0

What about feeding the file into a queue, reading it line by line, and having workers consume data from the queue?

DaLynX
  • 328
  • 1
  • 11
  • I tried to use a queue as indicated in my edits, but same issue persists. – dmornad Dec 23 '18 at 00:37
  • You are not splitting the data in several chuncks to be processed in parallel by the workers. I do not know pandas but it looks like you are feeding the whole data frame as one item in the queue. So of course only one worker can `get()` it. Can't you split it in lines or maybe chunks that will be fed in the queue with a loop ? Also if I were you I would start workers before you fill the queue so you do not need to load everything in ram before starting to run. – DaLynX Dec 23 '18 at 00:40
  • I cannot break the dataframe. Each worker needs the entire dataframe to generate output. Each worker takes an id and operates on the entire dataframe. The idea here is to parallelize these calculations for different ids per worker. – dmornad Dec 23 '18 at 01:35