2

Apologies in advance, but I am unable to post a fully working example (too much overhead in this code to distill to a runnable snippet). I will post as much explanatory detail as I can, and please do let me know if anything critical seems missing.

Running Python 2.7.5 through IDLE

I am writing a program to compare two text files. Since the files can be large (~500MB) and each row comparison is independent, I would like to implement multiprocessing to speed up the comparison. This is working pretty well, but I am getting stuck on a pseudo-random Bad file descriptor error. I am new to multiprocessing, so I guess there is a technical problem with my implementation. Can anyone point me in the right direction?

Here is the code causing the trouble (specifically the pool.map):

   # openfiles
   csvReaderTest = csv.reader(open(testpath, 'r'))
   csvReaderProd = csv.reader(open(prodpath, 'r'))    
   compwriter = csv.writer(open(outpath, 'wb'))

   pool = Pool()
   num_chunks = 3

   chunksTest = itertools.groupby(csvReaderTest, keyfunc)
   chunksProd = itertools.groupby(csvReaderProd, keyfunc)
   while True:
        # make a list of num_chunks chunks
        groupsTest = [list(chunk) for key, chunk in itertools.islice(chunksTest, num_chunks)]
        groupsProd = [list(chunk) for key, chunk in itertools.islice(chunksProd, num_chunks)]
        # merge the two lists (pair off comparison rows)
        groups_combined = zip(groupsTest,groupsProd)
        if groups_combined:
            # http://stackoverflow.com/questions/5442910/python-multiprocessing-pool-map-for-multiple-arguments
            a_args = groups_combined # a list - set of combinations to be tested
            second_arg = True
            worker_result = pool.map(worker_mini_star, itertools.izip(itertools.repeat(second_arg),a_args))

Here is the full error output. (This error sometimes occurs, and other times the comparison runs to finish without problems):

Traceback (most recent call last):
  File "H:/<PATH_SNIP>/python_csv_compare_multiprocessing_rev02_test2.py", line 407, in <module>
    main(fileTest, fileProd, fileout, stringFields, checkFileLengths)
  File "H:/<PATH_SNIP>/python_csv_compare_multiprocessing_rev02_test2.py", line 306, in main
    worker_result = pool.map(worker_mini_star, itertools.izip(itertools.repeat(second_arg),a_args))
  File "C:\Python27\lib\multiprocessing\pool.py", line 250, in map
    return self.map_async(func, iterable, chunksize).get()
  File "C:\Python27\lib\multiprocessing\pool.py", line 554, in get
    raise self._value
IOError: [Errno 9] Bad file descriptor

If it helps, here are the functions called by pool.map:

   def worker_mini(flag, chunk):
       row_comp = []
       for entry, entry2 in zip(chunk[0][0], chunk[1][0]):
           if entry == entry2:
               temp_comp = entry
           else:
               temp_comp = '%s|%s' % (entry, entry2)
           row_comp.append(temp_comp)
       return True, row_comp

   #takes a single tuple argument and unpacks the tuple to multiple arguments
   def worker_mini_star(flag_chunk):
       """Convert `f([1,2])` to `f(1,2)` call."""
       return worker_mini(*flag_chunk)

   def main():
Roberto
  • 2,054
  • 4
  • 31
  • 46
  • 1
    If you run your worker from a single process, do you still get errors? Try swapping out `pool.map` for just the builtin `map`. Not only might this tell you if the problem is caused by multiprocessing code or not, but it might also give you a more helpful exception traceback. – Blckknght May 24 '14 at 02:43
  • @Blckknght - thanks, I really appreciate the suggestions. I'll try these out when I am back in the office on Monday. Will report back then. – Roberto May 24 '14 at 16:45
  • @Blckknght - made some progress using your advice. Reducing to a single process did not help, but using `map` in place of `pool.map` did produce more useful debug info! Thanks. – Roberto May 27 '14 at 00:09

0 Answers0