1

I use multiprocessing in my python code to run asynchronously a function:

import multiprocessing

po = multiprocessing.Pool()
for elements in a_list:
    results.append(po.apply_async(my_module.my_function, (some_arguments, elements, a_big_argument)))               
po.close()
po.join()
for r in results:
    a_new_list.add(r.get())

a_big_argument is a dictionary. I give it as an argument. It is big in the sense that it is between 10 and 100 Mo. It seems like it has a big impact on the performance of my code.

I'm probably doing something stupid and not efficient here, since the performance of my code really went down with this new argument.

What is the best way to deal with a big dictionary? I don't want to load it every time in my function. Would it be a solution to create a database instead and to connect to it?

Here is a code you can run:

'''
Created on Mar 11, 2013

@author: Antonin
'''

import multiprocessing
import random

# generate an artificially big dictionary
def generateBigDict():
    myBigDict = {}
    for key in range (0,1000000):
        myBigDict[key] = 1
    return myBigDict

def myMainFunction():
    # load the dictionary
    myBigDict = generateBigDict()
    # create a list on which we will asynchronously run the subfunction
    myList = []
    for list_element in range(0,20):
        myList.append(random.randrange(0,1000000))
    # an empty set to receive results
    set_of_results = set()
    # there is a for loop here on one of the arguments
    for loop_element in range(0,150):
        results = []
        # asynchronoulsy run the subfunction
        po = multiprocessing.Pool()
        for list_element in myList:
            results.append(po.apply_async(mySubFunction, (loop_element, list_element, myBigDict)))               
        po.close()
        po.join()
        for r in results:
            set_of_results.add(r.get())
    for element in set_of_results:
        print element

def mySubFunction(loop_element, list_element, myBigDict):
    import math
    intermediaryResult = myBigDict[list_element]
    finalResult = intermediaryResult + loop_element
    return math.log(finalResult)

if __name__ == '__main__':
    myMainFunction()
Antonin
  • 1,748
  • 7
  • 19
  • 24
  • 1
    Can you provide a small real program that demonstrate the problem? See http://SSCCE.org – Robᵩ Mar 11 '13 at 21:43
  • 2
    Are you on windows? On windows `multiprocessing` *must* pickle the arguments and send then to the subprocesses, while on unix it *may* be able to `fork`(which should be much more efficient). – Bakuriu Mar 11 '13 at 21:44
  • 2
    I tested on my linux machine and it takes about 0.05 seconds to pass a `dict` containing about 200k objects. – Bakuriu Mar 11 '13 at 21:52
  • I've just added an (simple? small? realistic?) real program. I'm working on UNIX. I need to explore `fork`, I don't know how to use it. – Antonin Mar 11 '13 at 22:20

3 Answers3

3

I used multiprocessing.Manager to do it.

import multiprocessing

manager = multiprocessing.Manager()
a_shared_big_dictionary = manager.dict(a_big_dictionary)

po = multiprocessing.Pool()
for elements in a_list:
    results.append(po.apply_async(my_module.my_function, (some_arguments, elements, a_shared_big_dictionary)))               
po.close()
po.join()
for r in results:
    a_new_list.add(r.get())

Now, it is much faster.

Antonin
  • 1,748
  • 7
  • 19
  • 24
1

See the answer to Shared-memory objects in python multiprocessing question.

It suggests either using multiprocessing.Array to pass arrays to subprocesses or using fork().

Community
  • 1
  • 1
CaptSolo
  • 1,771
  • 1
  • 16
  • 17
  • Thank you for your answer. It was very helpful to read your links. Knowing that the object I'd like to share between my processes is a dict (and not an array), wouldn't it be better to use [Manager](http://docs.python.org/2/library/multiprocessing.html#sharing-state-between-processes) for this? – Antonin Mar 13 '13 at 14:13
  • 1
    You were looking for a speed increase. Manager involves serializing (pickling) and de-serializing Python objects when passing them to child processes (if I understand it correctly). That should be slower than shared-memory solutions such as Array. Check for yourself, though, what performance you get. – CaptSolo Mar 17 '13 at 21:44
  • Looking at your solution (using Manager), I see you got the performance increase that was needed. Good :) – CaptSolo Mar 17 '13 at 21:48
1

Any argument you pass to one of the Pool methods (e.g. apply_async) needs to be pickled, sent to the worker processes via pipes, and unpickled in the worker processes. This pickle/pass/unpickle process can be expensive in time and memory, especially if you have a large object graph since each worker process must create a separate copy.

There are many different ways to avoid these pickles depending on the exact shape of your problem. Since your workers are only reading your dictionary and not writing to it, you can safely reference it directly from your function (i.e. not pass it to apply_async) and rely on fork() to avoid creating a copy in worker processes.

Even better, you can change mySubFunction() so that it accepts intermediaryResult as an argument instead of looking it up using list_element and myBigDict. (You may be able to do this with a closure, but I am not 100% sure that pickle won't try to copy the closed-over myBigDict object as well.)

Alternatively, you can put myBigDict in some place where all processes can share it safely, e.g. one of the simple persistance methods, such as dbm or sqlite, and have workers access it from there.

Unfortunately all these solutions require that you change the shape of your task functions. Avoiding this "shape-changing" is one reason why people like "real" cpu threads.

Francis Avila
  • 31,233
  • 6
  • 58
  • 96
  • Thank you for your answer. You mention "some place where all processes can share it safely". Do you think [Server process with multiprocessing.Manager](http://docs.python.org/2/library/multiprocessing.html#sharing-state-between-processes) would correspond to this? – Antonin Mar 13 '13 at 14:11
  • 1
    A manager would probably work too, although keep in mind it only provides consistency, not atomicity, integrity or durability--i.e. if you share a data structure for writing, you still need to manage concurrency primitives yourself (e.g. with shared locks--which the manager can also hold for you). – Francis Avila Mar 13 '13 at 14:23
  • In your case, a manager and proxy object is really not necessary anyway. As I said, you can just make the big dict in-scope to worker methods and access it directly, and rely on `fork()` copy-on-write behavior to make sure it isn't copied. – Francis Avila Mar 13 '13 at 14:32
  • I found it much more difficult to find information about `fork()`. You suggest to use it but I have no idea how to do it. Manager is very easy to use and takes maybe 3 words on one line, no change in my function. It was done in 2 minutes, and I know exactly what it does. If you've any reference about `fork()`, I'd be interested. – Antonin Mar 13 '13 at 20:57
  • 1
    You don't have to do *anything* (except create your data structure before you create your workers--which you already do). multiprocessing already uses fork() to create the workers. You don't *use* fork() explicitly, you just *rely on* it's copy-on-write process spawning behavior.... – Francis Avila Mar 14 '13 at 01:38
  • OK, thank you for the explanation. Manager is still improving a lot my performance: from 40h to 2h30. – Antonin Mar 14 '13 at 09:50