10

How can I limit the maximum of open threads to 20 in the following code? I'm aware that there have been some similar questions asked in the past, but I specifically want to know how this is best done with a Queue and with a working example if possible.

    # b is a list with 10000 items
    threads = [threading.Thread(target=targetFunction, args=(ptf,anotherarg)) for ptf in b]
    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()
Nickpick
  • 6,163
  • 16
  • 65
  • 116
  • Have you tried `b[:20]`? Or do you want to process the whole range with 20 threads max? It's not quite clear in your question. – Frédéric Hamidi Feb 02 '16 at 17:52
  • # b is a list with 10000 items and cannot be changed – Nickpick Feb 02 '16 at 17:53
  • 1
    so use a threadpool and have `targetFunction` pull from a queue to get its work? – Adam Smith Feb 02 '16 at 17:54
  • So you want to process 20 threads per loop for example? or what? – Iron Fist Feb 02 '16 at 17:57
  • 1
    targetFunction is downloding information from a server and I'm not allowed to have more than 20 connections open at a time. That's why I want to limit the maximum of open threads to 20 – Nickpick Feb 02 '16 at 18:08
  • Have you tried the futures library in Python 3? There is a port from Python 3 to Python 2. https://pypi.python.org/pypi/futures. You can do executor = futures.ThreadPoolExecutor(10), which gives you max 10 threads. Then executor.submit(function_name, args).results() which gives you the result of the call. – user1157751 Feb 02 '16 at 19:00

1 Answers1

33

The simple way to do this is with a queue.Queue for the work and starting the threads with for _ in range(MAXTHREADS): threading.Thread(target=f, args=(the_queue,)).start(). I find this easier to read by subclassing Thread, however. Your mileage may vary.

import threading
import queue

class Worker(threading.Thread):
    def __init__(self, q, other_arg, *args, **kwargs):
        self.q = q
        self.other_arg = other_arg
        super().__init__(*args, **kwargs)
    def run(self):
        while True:
            try:
                work = self.q.get(timeout=3)  # 3s timeout
            except queue.Empty:
                return
            # do whatever work you have to do on work
            self.q.task_done()

q = queue.Queue()
for ptf in b:
    q.put_nowait(ptf)
for _ in range(20):
    Worker(q, otherarg).start()
q.join()  # blocks until the queue is empty.

If you're insistent about using a function, I'd suggest wrapping your targetFunction with something that knows how to get from the queue.

def wrapper_targetFunc(f, q, somearg):
    while True:
        try:
            work = q.get(timeout=3)  # or whatever
        except queue.Empty:
            return
        f(work, somearg)
        q.task_done()

q = queue.Queue()
for ptf in b:
    q.put_nowait(ptf)
for _ in range(20):
    threading.Thread(target=wrapper_targetFunc,
                     args=(targetFunction, q, otherarg)).start()
q.join()
Adam Smith
  • 52,157
  • 12
  • 73
  • 112
  • 1
    That looks quite good, but it will change quite some changes in my code if I want to put it in a class. Is there a way to just call a function instead? – Nickpick Feb 02 '16 at 18:41
  • looks almost perfect, but the targetFunction, but apparently the target function now requires two arguments. Do I need to pass q into it (this is only a cosmetic problem). Otherwise working perfectly. Thanks! – Nickpick Feb 02 '16 at 18:56
  • @Nicolas My code, above, seems to work perfectly. I'm not sure what you're experiencing. – Adam Smith Feb 02 '16 at 19:00
  • Correction: there's a problem, it always only takes the first element of b for some reason... – Nickpick Feb 02 '16 at 19:00
  • Note that your thread's target is now the wrapper function, which has three arguments: the function to do work with (`targetFunction`), the queue to get data from (`q`), and your last argument. – Adam Smith Feb 02 '16 at 19:01
  • @Nicolas Unless you're putting it back into the queue at some point in your `targetFunction`, it's not doing that :). Try substituting `targetFunction` with something like `print` to see what's going on. – Adam Smith Feb 02 '16 at 19:02
  • TypeError: targetFunction() takes 1 positional argument but 2 were given, so i added q into the target function as a parameter, But not sure why this is necessary? – Nickpick Feb 02 '16 at 19:04
  • @Nicolas then the sample code in your question is wrong, since it supplies two arguments to `targetFunction`. – Adam Smith Feb 02 '16 at 19:04
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/102396/discussion-between-adam-smith-and-nicolas). – Adam Smith Feb 02 '16 at 19:07
  • all working now. Excellent! – Nickpick Feb 02 '16 at 19:09
  • One last question, why do we need the timeout=3? – Nickpick Feb 02 '16 at 19:11
  • 1
    If two threads tried to pull from the queue at exactly the same time, one would block until the other had finished its `get`. That's not possible in CPython due to the GIL, but I have no way of knowing that using a version of Python that won't run concurrent threads. `3` is arbitrary. `timeout=1` should work identically. – Adam Smith Feb 02 '16 at 19:15
  • Is it not necessary to mark the threads as Desmond=True? – Nickpick Feb 02 '16 at 22:48
  • 1
    @Nicolas I assume you mean `daemon=True`. No it's not -- the thread dies when `run` returns, which happens in this code when `q.get` throws a `queue.Empty` exception. – Adam Smith Feb 02 '16 at 22:55
  • Does copy happened on `self.q = q` ? or all workers use same shared queue? – mrgloom Jul 04 '19 at 09:27
  • @mrgloom all workers share a queue (that's how the queue works -- load it up with tasks and workers will take tasks from them as they can.) – Adam Smith Jul 04 '19 at 09:36
  • Is it intended that `super().__init__(*args, **kwargs)` after other variables init? – mrgloom Jul 04 '19 at 09:49
  • @mrgloom either should be fine. – Adam Smith Jul 04 '19 at 09:50
  • what is difference between queue put() and put_nowait()..? – shivaraj karki Sep 07 '20 at 05:16
  • 1
    @shivarajkarki `q.put` will block forever if the queue is full, waiting for the queue to be emptied enough to put a new item into it. `q.put_nowait` will throw an exception if the queue is full when it tries to enqueue a new item. – Adam Smith Sep 07 '20 at 06:09