2

I am testing the following code and I would like to make sure if it is correct:

from threading import Thread
import cPickle

def get_user(start, end):
    global users, vusers
    for j in range(start,end):
        if str(users[j]).find('@N') != -1:
            vusers.append(users[j])


if __name__ == '__main__':
    users = cPickle.load(open('nsid.dmp', 'r'))
    vusers = []
    jobs = [Thread(target=get_user, args=(0,1839))\
        ,Thread(target=get_user, args=(1840,3679))\
        ,Thread(target=get_user, args=(3680,5519))\
        ,Thread(target=get_user, args=(5520,7359))\
        ,Thread(target=get_user, args=(7360,9199))\
        ,Thread(target=get_user, args=(9200,11039))\
        ,Thread(target=get_user, args=(11040,12879))\
        ,Thread(target=get_user, args=(12880,14719))\
        ,Thread(target=get_user, args=(14720,16559))\
        ,Thread(target=get_user, args=(16560,18399))\
        ,Thread(target=get_user, args=(18400,20239))\
        ,Thread(target=get_user, args=(20240,20079))\
        ,Thread(target=get_user, args=(22080,23919))\
        ,Thread(target=get_user, args=(23920,25759))\
        ,Thread(target=get_user, args=(25760,27599))\
        ,Thread(target=get_user, args=(27600,29439))]
    for jb in jobs:
        jb.start()  
    for jb in jobs:
        jb.join()
    vusers = list(set(vusers))
    out = open('validu.dmp', 'w')
    cPickle.dump(vusers, out)
    out.close()

So what I am trying to do is to run in parallel the function get_user with different ranges. Of course, the function get_user is more complicated than that and there are many other conditions to check but when I ran the code I couldn't see that is more time efficient. Is there anything wrong in my code and is it the proper way to write multithreaded function? If not, how I can make it run in parallel?

DNA
  • 42,007
  • 12
  • 107
  • 146
user9287
  • 23
  • 3
  • If you want it to just be faster, try `pypy`. But if you want to understand how to make algorithms execute in parallel optimally using Python, use `multiprocessing` as indicated in the answers. Note that it introduces mildly more complexity because you have to send the input data and retrieve the output data and synthesize it. – Brian Cain Mar 26 '15 at 21:49
  • @BrianCain would it work if I replace `Thread` by multiprocessing.Process then collect the data as described here: http://stackoverflow.com/questions/8329974/can-i-get-a-return-value-from-multiprocessing-process – user9287 Mar 26 '15 at 22:08

2 Answers2

1

This isn't going to run in parallel because of the GIL. I'm not even sure if it will run concurrently because each target function does not appear to let go of the GIL (i.e. there's no system calls etc).

To get around the GIL you would use the multiprocessing module. Sharing state is that much harder when you use multiprocessing so you would need to reorganize the code to collect answers returned from each subprocess in the main thread of your program.

DNA
  • 42,007
  • 12
  • 107
  • 146
Barry Rogerson
  • 598
  • 2
  • 15
  • Global Interpreter Lock See http://www.dabeaz.com/python/UnderstandingGIL.pdf or http://stackoverflow.com/questions/1294382/what-is-a-global-interpreter-lock-gil – Alexander Mar 26 '15 at 21:53
0

I assume you want to multitude for performance reasons. If that's the case you should probably be using Process instead of Thread, you can read more about the limitations of python Theads here.

I really like multiprocess.Pool which under the hood uses multiprocess.Process. It allows you to start up some fixed number of processes and queue tasks for them. Here is an example using pool.apply_async but I would also look into pool.map and pool.map_async.

from multiprocessing import Pool

def get_user(users):
    return [u for u in users if 'N' in u]

def main(users):
    vusers = []
    results = []

    N = 2
    chunk = 200

    pool = Pool(N)
    for i in range(0, len(users), chunk):
        r = pool.apply_async(get_user, args=(users[i:i + chunk],))
        results.append(r)

    pool.close()
    pool.join()

    for r in results:
        vusers.extend(r.get())

    return vusers

if __name__ == '__main__':
    import random
    import string
    users = [random.sample(string.ascii_uppercase, 5) for i in range(10000)]
    vusers = main(users)
Bi Rico
  • 25,283
  • 3
  • 52
  • 75
  • I just copied your code and it didn't work. I got the following error: AttributeError: 'module' object has no attribute 'get_user'. Any suggestion to fix that? – user9287 Mar 27 '15 at 02:15
  • Python uses pickle to send information between processes. I suspect that pickle is trying to import the `get_user` function in another process and somehow getting confused. Try copying the above code into a new file with a new filename then it should work fine. – Bi Rico Mar 27 '15 at 06:49