1

What I basically want to do is the following:

import threading
import Queue

def test_thread(elem, q):
    q.put(elem ** 2)

a = [1,2,3,4,5,6,7,8]
q = Queue.Queue()
results = []
for x in range(8):
    print x
    threading.Thread(target=test_thread, args=(a[x], q)).start()
    results.append(q.get())

But instead of running all threads at once I want to run only say 2 in parallel adn iterate over the list. Once one thread is done the next value from the list should be processed. I could't find an example and I am not sure how to construct the loops for this.

In addition, I do not understand the behavior of the Queue. I would have expected that ALL squared numbers are in the queue. But instead there is only one value? (The code above has changed to store all results in "results"). Hints, comments, keywords are highly appreciated.

EDIT:

Second question:

Sorry, I thought q.get() will return all the results. But it just gives the elements in a queue like fashion.

Frank V
  • 25,141
  • 34
  • 106
  • 144
Kam Sen
  • 1,098
  • 1
  • 11
  • 14
  • As an aside, using threads to do CPU-bound operations like mathematical calculations isn't a good idea in CPython. Because of the Global Interpreter Lock, only one thread can ever actually run at a time, so you don't get any performance benefit from multi-threading. In fact, it ends up being slower than a single-threaded application, because of the overhead of creating and switching between the threads. You should use the `multiprocessing` module instead, which allows you to get around the limitations of the GIL by using multiple processes instead of multiple threads. – dano Sep 16 '14 at 18:36
  • @dano. Thank you for the background information. I think I came across that. The real application is somewhat more complex. I want to iterate over a large file and hand the generator objects to the threads. These do need a lot of computational steps. Then the results are accumulated for some stats and then written to one result file. If I understood correctly, the benefit with threads would still be rather small here right? (since there is only little I/O involved). – Kam Sen Sep 16 '14 at 19:32
  • Correct. The only I/O is read/writing to disk, which can't really be effectively parallelized, anyway. You'd want to send the computational work off to processes in a `multiprocessing.Pool`, gather the results and write them to disk sequentially. Luckily it's very easy to switch from a `multiprocessing.pool.ThreadPool` to a `multiprocessing.Pool`, since they have identical APIs. – dano Sep 16 '14 at 19:36

1 Answers1

3

You can use a thread pool for this:

import threading
from multiprocessing.pool import ThreadPool

def test_thread(elem):
    return elem ** 2

a = [1,2,3,4,5,6,7,8]
pool = ThreadPool(2) # 2 worker threads
results = []
for x in range(8):
    print x
    results.append(pool.apply_async(test_thread, args=(a[x],)))

results = [result.get() for result in results]
# You can also replace this for loop altogether using pool.map
# and get the same result:
# results = pool.map(test_thread, range(8))
print(results)

Output:

0
1
2
3
4
5
6
7
[1, 4, 9, 16, 25, 36, 49, 64]

The ThreadPool class is a mostly undocumented part of the multiprocessing module. It can also be access via multiprocessing.dummy.Pool. It allows you to create a pool of threads to handle any number of work items, while always limiting the number of work items being concurrently processed to something you specify. You can use the documentation for the normal multiprocessing.Pool to learn about its API. It's exactly the same, except everywhere it says "process", you replace it with "thread".

I'm not sure I follow the second part of your question about Queue.Queue. On each iteration of your for loop, you're putting one item into the Queue inside test_thread, and then consuming it inside the for loop using results.append(q.get()). So while there is never more than one item in the Queue at a time, it is being used to transfer all the values that end up in the results list - one for each item in the range(8) list.

dano
  • 91,354
  • 19
  • 222
  • 219
  • Cheers! Are both versions working for you? I get with the for loop version the error "test_thread() argument after * must be a sequence, not int". The pool.map_async() function works, however. – Kam Sen Sep 16 '14 at 18:10
  • @KamSen Sorry, I had a typo in the `apply_async` call; the `args` argument has to take an iterable, like a one-element tuple (`(a[x],)`). I forgot the trailing comma originally. It's fixed now. I also should have used `map` rather than `map_async`, since we want to wait for all the results to be ready before moving on. – dano Sep 16 '14 at 18:26
  • @Aaron I wrote this answer in 2014. The documentation you cited didn't exist. All that was in the docs was "`multiprocessing.dummy` replicates the API of `multiprocessing` but is no more than a wrapper around the `threading` module." – dano Aug 17 '23 at 02:07
  • my bad.. this showed up on the front page for some reason, and I didn't look at the date – Aaron Aug 17 '23 at 04:13