0

I have a list of CSV files. I want to do a set of operations on each of them and then produce a counter dict and i want to cerate a master list containing individual counter dict from all CSV files. I want to parallelize processing each of the csv file and then return the counter dict from each file. I found a similar solution here : How can I recover the return value of a function passed to multiprocessing.Process?

I used the solution suggested by David Cullen. This solution works perfectly for strings, but when I tried to return a counter dict or a normal dict. All the CSV files are processed until the send_end.send(result) and it hangs on there forever when executed and then throws a memory error. I am running this in a Linux server with more than sufficient memory for creating the list of counter dicts.

I used the following code:

import multiprocessing

#get current working directory
cwd = os.getcwd()

#take a list of all files in cwd
files = os.listdir(cwd)

#defining the function that needs to be done on all csv files
def worker(f,send_end):
    infile= open(f) 
    #read liens in csv file
    lines = infile.readlines()
    #split the lines by "," and store it in a list of lists
    master_lst = [line.strip().split(“,”) for line in lines]
    #extract the second field in each sublist 
    counter_lst = [ element[1] for element in master_lst]
    print “Total elements in the list” + str(len(counter_lst))
    #create a dictionary of count elements
    a = Counter(counter_lst)
    # return the counter dict
    send_end.send(a)

def main():
    jobs = []
    pipe_list = []
    for f in files:
        if f.endswith('.csv'):
           recv_end, send_end = multiprocessing.Pipe(duplex=False)
           p = multiprocessing.Process(target=worker, args=(f, send_end))
           jobs.append(p)
           pipe_list.append(recv_end)
           p.start()

    for proc in jobs:
       proc.join()
    result_list = [x.recv() for x in pipe_list]
    print len(result_list)

if __name__ == '__main__':
     main()

The error that i get is the following:

Process Process-42:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in 
  _bootstrap
  self.run()
  File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
  self._target(*self._args, **self._kwargs)
  File "/home/amm/python/collapse_multiprocessing_return.py", line 32, in 
  worker
  a = Counter(counter_lst)
  File "/usr/lib64/python2.7/collections.py", line 444, in __init__
  self.update(iterable, **kwds)
  File "/usr/lib64/python2.7/collections.py", line 526, in update
  self[elem] = self_get(elem, 0) + 1
 MemoryError
 Process Process-17:
 Traceback (most recent call last):
 Process Process-6:
 Traceback (most recent call last):
 File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in 
 _bootstrap
 File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in 
 _bootstrap
 Process Process-8:
 Traceback (most recent call last):
 File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in 
 _bootstrap
 self.run()
 self.run()
 self.run()
 File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
 File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
 self._target(*self._args, **self._kwargs)
 File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
 File "/home/amm/python/collapse_multiprocessing_return.py", line 32, in 
 worker
 self._target(*self._args, **self._kwargs)
 self._target(*self._args, **self._kwargs)
 File "/home/amm/python/collapse_multiprocessing_return.py", line 32, in 
 worker
 File "/home/amm/python/collapse_multiprocessing_return.py", line 32, in 
 worker
 a = Counter(counter_lst_lst)
 a = Counter(counter_lst_lst)
 a = Counter(counter_lst_lst)
 File "/usr/lib64/python2.7/collections.py", line 444, in __init__
 File "/usr/lib64/python2.7/collections.py", line 444, in __init__
 File "/usr/lib64/python2.7/collections.py", line 444, in __init__
 self.update(iterable, **kwds)
 File "/usr/lib64/python2.7/collections.py", line 526, in update
 self[elem] = self_get(elem, 0) + 1
 MemoryError
 self.update(iterable, **kwds)
 self.update(iterable, **kwds)
 File "/usr/lib64/python2.7/collections.py", line 526, in update
 File "/usr/lib64/python2.7/collections.py", line 526, in update
 self[elem] = self_get(elem, 0) + 1
 self[elem] = self_get(elem, 0) + 1
 MemoryError
 MemoryError
 Process Process-10:
 Traceback (most recent call last):
 File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in 
 _bootstrap
 self.run()
 File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
 self._target(*self._args, **self._kwargs)
 File "/home/amm/python/collapse_multiprocessing_return.py", line 32, in 
 worker
 a = Counter(counter_lst)
 File "/usr/lib64/python2.7/collections.py", line 444, in __init__
 self.update(iterable, **kwds)
 File "/usr/lib64/python2.7/collections.py", line 526, in update
 self[elem] = self_get(elem, 0) + 1
 MemoryError
 ^Z
 [18]+  Stopped                 collapse_multiprocessing_return.py

Now instead of "a" in send_end.send(a) if i replace f, the filename. It prints the number of csv files in the directory (which is what len(result_list) does in this case). But when the counter dict "a" is returned it gets stuck forever, throwing the above error.

I would like to have the code pass the counter dict to receive end without any error/problems. Is there a work around? Could someone please suggest a possible solution?

p.s: I am new to multiprocessing module, sorry if this question sounds naive. Also, i tried the multiprocessing.Manager(), but got a similar error

rex
  • 47
  • 1
  • 7
  • 1. Your error message is missing. 2. Please provide a [mcve] that we can run ourselves. – Alex Hall May 03 '18 at 21:27
  • @AlexHall: Now i have included the entire message log until i terminated the script from running further – rex May 03 '18 at 21:58

1 Answers1

1

Your traceback mentions Process Process-42:, so there are at least 42 processes being created. You're creating a process for every CSV file, which is not useful and is probably causing the memory error.

Your problem can be solved much more simply using multiprocessing.Pool.map. The worker function can also be shortened greatly:

def worker(f):
    with open(f) as infile:
        return Counter(line.strip().split(",")[1]
                       for line in infile)

def main():
    pool = multiprocessing.Pool()
    result_list = pool.map(worker, [f for f in files if f.endswith('.csv')])

Passing no arguments to the pool means it'll create as many processes as you have CPU cores. Using more may or may not increase performance.

Alex Hall
  • 34,833
  • 5
  • 57
  • 89