6

How can I limit the number of concurrent threads in Python?

For example, I have a directory with many files, and I want to process all of them, but only 4 at a time in parallel.

Here is what I have so far:

def process_file(fname):
        # open file and do something                                                                                            

def process_file_thread(queue, fname):
    queue.put(process_file(fname))

def process_all_files(d):
    files=glob.glob(d + '/*')
    q=Queue.Queue()
    for fname in files:
        t=threading.Thread(target=process_file_thread, args=(q, fname))
        t.start()
    q.join()

def main():
    process_all_files('.')
    # Do something after all files have been processed

How can I modify the code so that only 4 threads are run at a time?

Note that I want to wait for all files to be processed and then continue and work on the processed files.

Frank
  • 64,140
  • 93
  • 237
  • 324
  • 2
    Have you tried [multiprocess](http://docs.python.org/2/library/multiprocessing.html#module-multiprocessing) Pools? On Python 3 you can also use [futures](http://docs.python.org/dev/library/concurrent.futures.html). – javex Aug 21 '13 at 00:56
  • 2
    You can use [`futures`](https://pypi.python.org/pypi/futures) in Python 2 also, you just need to install the backport. – abarnert Aug 21 '13 at 00:57
  • concurrent.futures is indeed the best way to do it – JBernardo Aug 21 '13 at 00:58
  • You could use a `multiprocessing.pool.ThreadPool` to easily limit the number of threads, as shown in [this answer](http://stackoverflow.com/a/18283388/355230) to another question. – martineau Aug 21 '13 at 02:17

2 Answers2

9

For example, I have a directory with many files, and I want to process all of them, but only 4 at a time in parallel.

That's exactly what a thread pool does: You create jobs, and the pool runs 4 at a time in parallel. You can make things even simpler by using an executor, where you just hand it functions (or other callables) and it hands you back futures for the results. You can build all of this yourself, but you don't have to.*

The stdlib's concurrent.futures module is the easiest way to do this. (For Python 3.1 and earlier, see the backport.) In fact, one of the main examples is very close to what you want to do. But let's adapt it to your exact use case:

def process_all_files(d):
    files = glob.glob(d + '/*')
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        fs = [executor.submit(process_file, file) for file in files]
        concurrent.futures.wait(fs)

If you wanted process_file to return something, that's almost as easy:

def process_all_files(d):
    files = glob.glob(d + '/*')
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        fs = [executor.submit(process_file, file) for file in files]
        for f in concurrent.futures.as_completed(fs):
            do_something(f.result())

And if you want to handle exceptions too… well, just look at the example; it's just a try/except around the call to result().


* If you want to build them yourself, it's not that hard. The source to multiprocessing.pool is well written and commented, and not that complicated, and most of the hard stuff isn't relevant to threading; the source to concurrent.futures is even simpler.

abarnert
  • 354,177
  • 51
  • 601
  • 671
0

I used this technique a few times, I think it's a bit ugly thought:

import threading

def process_something():
    something = list(get_something)

    def worker():
        while something:
            obj = something.pop()
            # do something with obj

   threads = [Thread(target=worker) for i in range(4)]
   [t.start() for t in threads]
   [t.join() for t in threads]
Ski
  • 14,197
  • 3
  • 54
  • 64