1

I am trying to learn multiprocessing with python. I wrote a simple code that should feed each process with 1000 lines from a txt input file. My main function reads a line, splits it and then performs some very simple operations with the elements in the string. Eventually the results should be written in an output file.

When I run it, 4 processes are correctly spawned, but only one process is actually running with minimal CPU. As a result the code is very slow and defies the purpose to use multiprocessing in the first place. I think I don't have a global list problem like in this question (python multiprocessing apply_async only uses one process) and I don't think my function is too trivial as in this case (Python multiprocessing.Pool() doesn't use 100% of each CPU).

I can't understand what I'm doing wrong, any help/suggestion is appreciated. Here's the basic code:

import multiprocessing
import itertools

def myfunction(line):
        returnlist=[]
        list_of_elem=line.split(",")
        elem_id=list_of_elem[1]
        elem_to_check=list_of_elem[5]

        ids=list_of_elem[2].split("|")

        for x in itertools.permutations(ids,2):
                if x[1] == elem_to_check:
                            returnlist.append(",".join([elem_id,x,"1\n"]))
                else:
                            returnlist.append(",".join([elem_id,x,"0\n"]))

        return returnlist       

def grouper(n, iterable, padvalue=None):
    return itertools.izip_longest(*[iter(iterable)]*n, fillvalue=padvalue)

if __name__ == '__main__':
    my_data = open(r"my_input_file_to_be_processed.txt","r")
    my_data = my_data.read().split("\n")   

    p = multiprocessing.Pool(4)

    for chunk in grouper(1000, my_data):
            results = p.map(myfunction, chunk)
            for r in results:
                with open (r"my_output_file","ab") as outfile:
                   outfile.write(r)

EDIT I modified my code following the suggestions (deleting redundant data pre-processing). However, the problem seems to be still there.

import multiprocessing
import itertools

def myfunction(line):
        returnlist=[]
        list_of_elem=line.split(",")
        elem_id=list_of_elem[1]
        elem_to_check=list_of_elem[5]

        ids=list_of_elem[2].split("|")

        for x in itertools.permutations(ids,2):
                if x[1] == elem_to_check:
                            returnlist.append(",".join([elem_id,x,"1\n"]))
                else:
                            returnlist.append(",".join([elem_id,x,"0\n"]))

        return returnlist       

if __name__ == '__main__':
    my_data = open(r"my_input_file_to_be_processed.txt","r")

    p = multiprocessing.Pool(4)

    results = p.map(myfunction, chunk, chunksize=1000)
        for r in results:
            with open (r"my_output_file","ab") as outfile:
                outfile.write(r)
user2447387
  • 173
  • 1
  • 3
  • 12
  • 1
    All your outer loop seems pointless to me as `p.map` will distribute chunk lines among the workers. And why slice the data by hand when `Pool.map` already has `chunksize` param? – robyschek Apr 26 '16 at 17:40
  • I guess you are not preparing your data correctly. you should only call `Pool.map` once with something like `p.map(func, dataset)` if your dataset have been previously splited in an appropriate number of chunks , or use the `chunksize` parameter like `p.map(func, dataset, chunksize)` if it hasn't. (Putting `Pool.map` in your loop makes you computing each chunk one after the other instead of concurrently). – mgc Apr 26 '16 at 17:44
  • Thanks to both of you for the suggestion regarding the chunks. @robyscheck: I think I still need to split the chunks in single lines, on which the basic function is performing, right? – user2447387 Apr 26 '16 at 17:47
  • @user2539785 I think `my_data` is already line-split and `myfunction` expects single line, if so then no futher split needed and `my_data` may be passed to `map` as is. – robyschek Apr 26 '16 at 17:54
  • I think getting rid of the grouper function is what solves the problem, as @mgc correctly pointed out. I tried to modify my code using p.map(func, dataset, chunksize) but I get an error in results = p.map(myfunction, my_data, 1000), File "C:\Python27\lib\multiprocessing\pool.py", line 251, in map return self.map_async(func, iterable, chunksize).get(), File "C:\Python27\lib\multiprocessing\pool.py", line 567, in get raise self._value , IndexError: list index out of range – user2447387 Apr 26 '16 at 18:09

1 Answers1

0

According to your snippet of code I guess I would do something like this to chunk the file in 8 parts and then make the computation to be done by 4 workers (why 8 chunks and 4 workers ? Just a random choice I made for the example.) :

from multiprocessing import Pool
import itertools

def myfunction(lines):
    returnlist = []
    for line in lines:
        list_of_elem = line.split(",")
        elem_id = list_of_elem[1]
        elem_to_check = list_of_elem[5]
        ids = list_of_elem[2].split("|")

        for x in itertools.permutations(ids,2):
            returnlist.append(",".join(
                [elem_id,x,"1\n" if x[1] == elem_to_check else "0\n"]))

    return returnlist

def chunk(it, size):
    it = iter(it)
    return iter(lambda: tuple(itertools.islice(it, size)), ())

if __name__ == "__main__":
    my_data = open(r"my_input_file_to_be_processed.txt","r")
    my_data = my_data.read().split("\n")   

    prep = [strings for strings in chunk(my_data, round(len(my_data) / 8))]
    with Pool(4) as p:
        res = p.map(myfunction, prep)

    result = res.pop(0)
    _ = list(map(lambda x: result.extend(x), res))
    print(result)  # ... or do something with the result

Edit : This is assuming you are confident all lines are formatted in the same way and will cause no error.

According to your comments it might be useful to see what is the problem in your function/the content of your file by testing it without multiprocessing or using try/except in a pretty large/ugly way to be almost sure that an output will be produced (either the exception or the result) :

def myfunction(lines):
    returnlist = []
    for line in lines:
        try:
            list_of_elem = line.split(",")
            elem_id = list_of_elem[1]
            elem_to_check = list_of_elem[5]
            ids = list_of_elem[2].split("|")

            for x in itertools.permutations(ids,2):
                returnlist.append(",".join(
                    [elem_id,x,"1\n" if x[1] == elem_to_check else "0\n"]))
        except Exception as err:
            returnlist.append('I encountered error {} on line {}'.format(err, line))

    return returnlist
mgc
  • 5,223
  • 1
  • 24
  • 37
  • Thanks. However this throws an error: with p.Pool(4) as mp_pool: AttributeError: _ _exit_ _ – user2447387 Apr 26 '16 at 18:05
  • Yould should write `with Pool(4) as p:` instead as `with p.Pool(4) ..` I guess. – mgc Apr 26 '16 at 18:15
  • No, that's not the problem apparently. It throws the same error. – user2447387 Apr 26 '16 at 18:17
  • There is also a mistake in your function `myfunction`, in the naming of the return list (declared as `return_list` but used as `returnlist`), it might come from here. I modified it in my answer. – mgc Apr 26 '16 at 18:22
  • you're right, I corrected the mistakein my question. The error is somehow still there though – user2447387 Apr 26 '16 at 18:29
  • My bad, i spotted the error (I guess). You also have to slightly refactor the code of `myfunction` as its not one line but a chunk of lines which is passed in arguments (see my edits). – mgc Apr 26 '16 at 18:35
  • Somehow the problem persists :( I think it has something to do with the "with" statement + Pool – user2447387 Apr 26 '16 at 18:50
  • I tried to run your code with python3.3 and now the error is different, saying: res = p.map(myfunction, prep) File "d:\anaconda3\lib\multiprocessing\pool.py", line 260, in map return self._map_async(func, iterable, mapstar, chunksize).get() File "d:\anaconda3\lib\multiprocessing\pool.py", line 608, in get raise self._value IndexError: list index out of range I think the previous error had to do with this problem:https://stackoverflow.com/questions/25968518/python-multiprocessing-lib-error-attributeerror-exit – user2447387 Apr 26 '16 at 19:07
  • 1
    Sorry to hear that.. I don't know exactly for the `with` syntax in python 2.7, but just a last insight : are you sure that all your lines can go through `myfunction` without throwing an error ? (code using `multiprocessing` can sometimes be harder to debug, so if an exception is raised in `myfunction` it wont appear as clear in the interpreter) – mgc Apr 26 '16 at 19:10
  • According to your last comment this is exactly the kind of error I mentioned just above. – mgc Apr 26 '16 at 19:16
  • Yeah you might be right. Let me check that once again and if that's the case I will accept your answer as the correct one. – user2447387 Apr 26 '16 at 22:13
  • Now it's working, it was indeed my fault, sorry about that. However, if I try to write out the results to a file, I notice that all the processes disappears and just one remains. Is there an efficient way to do that as well? So far I've just added at the end of the code the following lines: for r in result: with open (r"outfile.txt","a") as outfile: outfile.write(str(r)) Is there a better way? Thanks for the patience – user2447387 Apr 26 '16 at 22:30