58

(This question is about how to make multiprocessing.Pool() run code faster. I finally solved it, and the final solution can be found at the bottom of the post.)

Original Question:

I'm trying to use Python to compare a word with many other words in a list and retrieve a list of the most similar ones. To do that I am using the difflib.get_close_matches function. I'm on a relatively new and powerful Windows 7 Laptop computer, with Python 2.6.5.

What I want is to speed up the comparison process because my comparison list of words is very long and I have to repeat the comparison process several times. When I heard about the multiprocessing module it seemed logical that if the comparison could be broken up into worker tasks and run simultaneously (and thus making use of machine power in exchange for faster speed) my comparison task would finish faster.

However, even after having tried many different ways, and used methods that have been shown in the docs and suggested in forum posts, the Pool method just seems to be incredibly slow, much slower than just running the original get_close_matches function on the entire list at once. I would like help understanding why Pool() is being so slow and if I am using it correctly. Im only using this string comparison scenario as an example because that is the most recent example I could think of where I was unable to understand or get multiprocessing to work for rather than against me. Below is just an example code from the difflib scenario showing the time differences between the ordinary and the Pooled methods:

from multiprocessing import Pool
import random, time, difflib

# constants
wordlist = ["".join([random.choice([letter for letter in "abcdefghijklmnopqersty"]) for lengthofword in xrange(5)]) for nrofwords in xrange(1000000)]
mainword = "hello"

# comparison function
def findclosematch(subwordlist):
    matches = difflib.get_close_matches(mainword,subwordlist,len(subwordlist),0.7)
    if matches <> []:
        return matches

# pool
print "pool method"
if __name__ == '__main__':
    pool = Pool(processes=3)
    t=time.time()
    result = pool.map_async(findclosematch, wordlist, chunksize=100)
    #do something with result
    for r in result.get():
        pass
    print time.time()-t

# normal
print "normal method"
t=time.time()
# run function
result = findclosematch(wordlist)
# do something with results
for r in result:
    pass
print time.time()-t

The word to be found is "hello", and the list of words in which to find close matches is a 1 million long list of 5 randomly joined characters (only for illustration purposes). I use 3 processor cores and the map function with a chunksize of 100 (listitems to be procesed per worker I think??) (I also tried chunksizes of 1000 and 10 000 but there was no real difference). Notice that in both methods I start the timer right before calling on my function and end it right after having looped through the results. As you can see below the timing results are clearly in favor of the original non-Pool method:

>>> 
pool method
37.1690001488 seconds
normal method
10.5329999924 seconds
>>> 

The Pool method is almost 4 times slower than the original method. Is there something I am missing here, or maybe misunderstanding about how the Pooling/multiprocessing works? I do suspect that part of the problem here could be that the map function returns None and so adds thousands of unneccessary items to the resultslist even though I only want actual matches to be returned to the results and have written it as such in the function. From what I understand that is just how map works. I have heard about some other functions like filter that only collects non-False results, but I dont think that multiprocessing/Pool supports the filter method. Are there any other functions besides map/imap in the multiprocessing module that could help me out in only returning what my function returns? Apply function is more for giving multiple arguments as I understand it.

I know there's also the imap function, which I tried but without any time-improvements. The reason being the same reason why I have had problems understanding what's so great about the itertools module, supposedly "lightning fast", which I've noticed is true for calling the function, but in my experience and from what I've read that's because calling the function doesn't actually do any calculations, so when it's time to iterate through the results to collect and analyze them (without which there would be no point in calling the cuntion) it takes just as much or sometimes more time than a just using the normal version of the function straightup. But I suppose that's for another post.

Anyway, excited to see if someone can nudge me in the right direction here, and really appreciate any help on this. I'm more interested in understanding multiprocessing in general than to get this example to work, though it would be useful with some example solution code suggestions to aid in my understanding.

The Answer:

Seems like the slowdown had to do with the slow startup time of additional processes. I couldnt get the .Pool() function to be fast enough. My final solution to make it faster was to manually split the workload list, use multiple .Process() instead of .Pool(), and return the solutions in a Queue. But I wonder if maybe the most crucial change might have been splitting the workload in terms of the main word to look for rather than the words to compare with, perhaps because the difflib search function is already so fast. Here is the new code running 5 processes at the same time, and turned out about x10 faster than running a simple code (6 seconds vs 55 seconds). Very useful for fast fuzzy lookups, on top of how fast difflib already is.

from multiprocessing import Process, Queue
import difflib, random, time

def f2(wordlist, mainwordlist, q):
    for mainword in mainwordlist:
        matches = difflib.get_close_matches(mainword,wordlist,len(wordlist),0.7)
        q.put(matches)

if __name__ == '__main__':

    # constants (for 50 input words, find closest match in list of 100 000 comparison words)
    q = Queue()
    wordlist = ["".join([random.choice([letter for letter in "abcdefghijklmnopqersty"]) for lengthofword in xrange(5)]) for nrofwords in xrange(100000)]
    mainword = "hello"
    mainwordlist = [mainword for each in xrange(50)]

    # normal approach
    t = time.time()
    for mainword in mainwordlist:
        matches = difflib.get_close_matches(mainword,wordlist,len(wordlist),0.7)
        q.put(matches)
    print time.time()-t

    # split work into 5 or 10 processes
    processes = 5
    def splitlist(inlist, chunksize):
        return [inlist[x:x+chunksize] for x in xrange(0, len(inlist), chunksize)]
    print len(mainwordlist)/processes
    mainwordlistsplitted = splitlist(mainwordlist, len(mainwordlist)/processes)
    print "list ready"

    t = time.time()
    for submainwordlist in mainwordlistsplitted:
        print "sub"
        p = Process(target=f2, args=(wordlist,submainwordlist,q,))
        p.Daemon = True
        p.start()
    for submainwordlist in mainwordlistsplitted:
        p.join()
    print time.time()-t
    while True:
        print q.get()
Karim Bahgat
  • 2,781
  • 3
  • 21
  • 27
  • Have you tried increasing the chunk size? Like chunksize=100000 or so? – Hannes Ovrén Dec 22 '13 at 08:40
  • to compare apples to apples, you should compare: `result = pool.map(findclosematch, wordlist)` vs. `result = map(findclosematch, wordlist)`. – jfs Dec 22 '13 at 09:57
  • 1
    then change the calls so that `findclosematch()` does more work. Otherwise pickling/unpickling the arguments will dominate the run time. – jfs Dec 22 '13 at 09:59
  • `if matches <> []:` is awful, use `if matches:` instead. – jfs Dec 22 '13 at 10:00
  • you could use `from timeit import default_timer as timer` instead of `time.time()` to get the best timer for performance measurements in a portable way. – jfs Dec 22 '13 at 10:02
  • 1
    Do **not** use `<>`. It's been deprecated by a *lot* of time and in python3 it will raise a `SyntaxError`, so you are making the code much less forward-compatible using it. Note that spawning processes and inter-process communication costs *a lot*. If you want to decrease time with multiple processes you must be sure that the computing time is big enough so that the overhead doesn't matter. In your case I believe this is not true. – Bakuriu Dec 22 '13 at 11:00
  • 1
    Also the `if matches:` check is completely useless and might create bugs. I just tried to run the script modifying a bit some parameters and got a `TypeError: NoneType object is not iterable` due to that bogus check. 99.9% of the time a function should always return the same time. Do not special-case void results with `None` cause you are just complicating the handling of the function result in the rest of the code. – Bakuriu Dec 22 '13 at 11:05
  • Hannes Ovrén , Yes I did try to increase the chunksize and there was no noticable difference. @J.F. Sebastian and Bakuriu, Thanks for the tips with improving my syntax and how to better time and compare my test cases. As Multimedia Mike also said I think the main problem was that multiprocessing has a slow startuptime and should therefore be better distributed with the workload. I did some tweaks in fact and manually created my own Pool which gave me more flexibility and thus I managed to make multiprocessing faster (see my comment to Multimedia Mike). – Karim Bahgat Jan 02 '14 at 22:12
  • @KarimBahgat: I faced a similar situation and had to resolve it by removing the pool api. I am interested about your 10X improvement, though. How did you find the exact number of processes to spawn to get this speed improvement? I understand this is a classical scalability question in parallel programming, but just curious to know the heuristic for your problem. In my case, 4 processes is good and 8 brings the performance down. – Jey Jun 27 '14 at 11:42

4 Answers4

47

These problems usually boil down to the following:

The function you are trying to parallelize doesn't require enough CPU resources (i.e. CPU time) to rationalize parallelization!

Sure, when you parallelize with multiprocessing.Pool(8), you theoretically (but not practically) could get a 8x speed up.

However, keep in mind that this isn't free - you gain this parallelization at the expense of the following overhead:

  1. Creating a task for every chunk (of size chunksize) in your iter passed to Pool.map(f, iter)
  2. For each task
    1. Serialize the task, and the task's return value (think pickle.dumps())
    2. Deserialize the task, and the task's return value (think pickle.loads())
    3. Waste significant time waiting for Locks on shared memory Queues, while worker processes and parent processes get() and put() from/to these Queues.
  3. One-time cost of calls to os.fork() for each worker process, which is expensive.

In essence, when using Pool() you want:

  1. High CPU resource requirements
  2. Low data footprint passed to each function call
  3. Reasonably long iter to justify the one-time cost of (3) above.

For a more in-depth exploration, this post and linked talk walk-through how large data being passed to Pool.map() (and friends) gets you into trouble.

Raymond Hettinger also talks about proper use of Python's concurrency here.

The Aelfinn
  • 13,649
  • 2
  • 54
  • 45
18

My best guess is inter-process communication (IPC) overhead. In the single-process instance, the single process has the word list. When delegating to various other processes, the main process needs to constantly shuttle sections of the list to other processes.

Thus, it follows that a better approach might be to spin off n processes, each of which is responsible for loading/generating 1/n segment of the list and checking if the word is in that part of the list.

I'm not sure how to do that with Python's multiprocessing library, though.

Multimedia Mike
  • 12,660
  • 5
  • 46
  • 62
  • 1
    I agree and suspect there was something like the process startup time and communication that was bottling my script. I eventually used the multiprocessing.Process function instead which allowed me to manually split my list and make a x10 time improvement. See my updated post for the new code I used. – Karim Bahgat Jan 02 '14 at 22:23
5

I experienced something similar with the Pool on a different problem. I'm not sure of the actual cause at this point...

The Answer edit by OP Karim Bahgat is the same solution that worked for me. After switching to a Process & Queue system, I was able to see speedups inline with the number of cores for a machine.

Here's an example.

def do_something(data):
    return data * 2

def consumer(inQ, outQ):
    while True:
        try:
            # get a new message
            val = inQ.get()

            # this is the 'TERM' signal
            if val is None:
                break;

            # unpack the message
            pos = val[0]  # its helpful to pass in/out the pos in the array
            data = val[1]

            # process the data
            ret = do_something(data)

            # send the response / results
            outQ.put( (pos, ret) )


        except Exception, e:
            print "error!", e
            break

def process_data(data_list, inQ, outQ):
    # send pos/data to workers
    for i,dat in enumerate(data_list):
        inQ.put( (i,dat) )

    # process results
    for i in range(len(data_list)):
        ret = outQ.get()
        pos = ret[0]
        dat = ret[1]
        data_list[pos] = dat


def main():
    # initialize things
    n_workers = 4
    inQ = mp.Queue()
    outQ = mp.Queue()
    # instantiate workers
    workers = [mp.Process(target=consumer, args=(inQ,outQ))
               for i in range(n_workers)]

    # start the workers
    for w in workers:
        w.start()

    # gather some data
    data_list = [ d for d in range(1000)]

    # lets process the data a few times
    for i in range(4):
        process_data(data_list)

    # tell all workers, no more data (one msg for each)
    for i in range(n_workers):
        inQ.put(None)
    # join on the workers
    for w in workers:
        w.join()

    # print out final results  (i*16)
    for i,dat in enumerate(data_list):
        print i, dat
sunshinekitty
  • 2,307
  • 6
  • 19
  • 31
verdverm
  • 329
  • 4
  • 11
2

Pool.map is slower because it takes time to start the processes and then transfer the necessary memory from one to all processes as Multimedia Mike said. I have gone through the similar problem and I switched to multiprocessing.Process.

But multiprocessing.Process takes more time to start the processes than Pool.map

Solution:

  • Create the processes in advance and keep the static data into the processes.
  • Use queues to pass data to processes
  • Also use queues to receive the result from the processes.

This way I managed to search for the best match from 1 million face features in 3 seconds on core i5 8265U processor laptop with windows.

Code - multiprocess_queue_matcher.py:

import multiprocessing

from utils import utils

no_of_processes = 0
input_queues = []
output_queues = []
db_embeddings = []
slices = None


def set_data(no_of_processes1, input_queues1, output_queues1, db_embeddings1):
    global no_of_processes
    no_of_processes = no_of_processes1
    global input_queues
    input_queues = input_queues1
    global output_queues
    output_queues = output_queues1
    global db_embeddings
    print("db_embeddings1 size = " + str(len(db_embeddings1)))
    db_embeddings.extend(db_embeddings1)
    global slices
    slices = chunks()


def chunks():
    size = len(db_embeddings) // no_of_processes
    return [db_embeddings[i:i + size] for i in range(0, len(db_embeddings), size)]


def do_job2(slice, input_queue, output_queue):
    while True:
        emb_to_search = input_queue.get()
        dist1 = 2
        item1 = []
        data_slice = slice
        # emb_to_search = obj[1]
        for item in data_slice:
            emb = item[0]
            dist = utils.calculate_squared_distance(emb_to_search, emb)
            if dist < dist1:
                dist1 = dist
                item1 = item
                item1.append(dist1)
        output_queue.put(item1)
    # if return_value is None:
    #     return item1
    # else:
    #     return_value.set_value(None, item1[1], item1[2], item1[3], item1[4], dist1)


def submit_job(emb):
    for i in range(len(slices)):
        input_queues[i].put(emb)


def get_output_queues():
    return output_queues


def start_processes():
    # slice = self.chunks()
    # ctx = multiprocessing.get_context("spawn")
    # BaseManager.register('FaceData', FaceData)
    # manager = BaseManager()
    # manager.start()
    # return_values = []
    global no_of_processes
    global input_queues
    global output_queues
    processes = []
    pos = 0
    for i in range(no_of_processes):
        p = multiprocessing.Process(target=do_job2, args=(slices[i], input_queues[i], output_queues[i],))
        p.Daemon = True
        processes.append(p)
        pos += 1
        p.start()

Then use this module where you need.

Advance startup code for flask:

mysql = None

db_operator = None

all_db_embeddings = []

input_queues = []
output_queues = []
no_of_processes = 4


@app.before_first_request
def initialize():
    global mysql
    global db_operator
    mysql = MySQL(app)
    db_operator = DBOperator(mysql)
    ret, db_embeddings, error_message = db_operator.get_face_data_for_all_face_ids_for_all_users()
    all_db_embeddings.extend(db_embeddings)
    for i in range(no_of_processes):
        in_q = multiprocessing.Queue()
        out_q = multiprocessing.Queue()
        input_queues.append(in_q)
        output_queues.append(out_q)
    multiprocess_queue_matcher.set_data(no_of_processes, input_queues, output_queues, all_db_embeddings)
    multiprocess_queue_matcher.start_processes()

Pass jobs to processes on demand on any request end point

emb_to_match = all_db_embeddings[0][0]
    starttime = time.time()
    multiprocess_queue_matcher.submit_job(emb_to_match)
    outputs = []
    for i in range(no_of_processes):
        out_q = output_queues[i]
        outputs.append(out_q.get())
    max = [None, None, None, None, None, 2.0]
    for val in outputs:
        if val[5] < max[5]:
            max = val
    time_elapsed = time.time() - starttime
    return jsonify(
        {"status": "success", "message": "Face search completed", "best_match_faceid": max[1],
         "name": max[2], "distance": max[5], "search_time": time_elapsed})

Any suggestions and improvements in this code?

Gopal Singh Sirvi
  • 4,539
  • 5
  • 33
  • 55