0

I have a function where I'm reading lines from stdin and using itertools.groupby to group lines and then do some processing. My code looks like this:

from itertools import groupby
from operator import itemgetter

def func(lines):
    for key, group in groupby(lines, key=itemgetter(0)):
        lst = list(group)
        results = my_cpu_intensive_function(lst)

        # send results to stdout for further processing
        print(results)

def main():
    # a generator holding lists of data
    lines = (line.strip().split('\t') for line in sys.stdin)
    func(lines)

Everything works how I want it too, however, my my_cpu_intensive_function() is very CPU-intensive. How can I parallelize this function to speed up my code? I was looking at multiprocessing.Pool(), but I couldn't figure out how to use it or if it was the right tool for the job.

turtle
  • 7,533
  • 18
  • 68
  • 97

1 Answers1

1

Multiprocessing is definitely a good module to use. I haven't used Pool personally (I usually just spawn processes with Process() ), but it looks pretty easy in the documentation.

https://docs.python.org/2/library/multiprocessing.html

Just make a list of your inputs to the worker processes, and use pool.map():

from itertools import groupby
from operator import itemgetter
from multiprocessing import pool

def func(lines):
    groups = []
    for key, group in groupby(lines, key=itemgetter(0)):
        groups.append( list(group) )

    pool = Pool(processes=len(groups))

    results_lst = pool.map( my_cpu_intensive_function, groups )

    # send results to stdout for further processing
    print(results)

def main():
    # a generator holding lists of data
    lines = (line.strip().split('\t') for line in sys.stdin)
    func(lines)

Though if you have a lot of groups, you might want to be careful with how many subprocesses you are spawning. Perhaps replacing len(groups) by max( len(groups), 16) or so is a good idea. You can find out a good value to max out at via this question.

Community
  • 1
  • 1
DenverCoder9
  • 473
  • 3
  • 9
  • Thanks for the help. Unfortunately, appending each group into a list of groups won't work for me. I'm working with a lot of lines and I wont be able to hold all that data in memory. What I'd like is some way to split the groups and call `my_cpu_intensive_function` for each group. Make sense? – turtle Nov 20 '14 at 02:28
  • Ahh - I see. That shouldn't be too hard with Process(). I can post an example with that if I have a few minutes later today! – DenverCoder9 Nov 20 '14 at 13:17
  • Thanks! I figured it out. – turtle Nov 20 '14 at 17:52