1

I am getting a RuntimeError sometimes (not more than 1% of the time, I would say) when using ThreadPool from multiprocessing.pool in Python.

I have read that this happens if one tries to open hundreds of threads. In my case, it is supposed to be maximum 4 threads, so I am a bit confused why this is happening.

I have been using previously in the exact same environment ThreadPool with 3 threads and never got an error.

My code is:

import time
from multiprocessing.pool import ThreadPool

while True:
    qty_fetched = 6
    time.sleep(random_secs(0.5))
    pending_updates = fetch_pending_updates(qty_fetched) #list of dicts

    if pending_updates:
        prio = pending_updates[0]['prio'] #variable number between 0 and 4 (edited from original question)

        if prio > 3:
            qty_threads = 1

        elif prio == 0 or prio == 1:
            qty_threads = 4

        else:
            qty_threads = 3

        pool = ThreadPool(qty_threads)
        pool.map(self.run_update_NEW, pending_updates) #a list of 6 dicts will be given to the pool of 1, 3 or 4 threads

    else:
        time.sleep(2)

And the Traceback:

...
pool = ThreadPool(qty_threads) 
File "/app/.heroku/python/lib/python3.6/multiprocessing/pool.py", line 789, in __init__ 
Pool.__init__(self, processes, initializer, initargs) 
File "/app/.heroku/python/lib/python3.6/multiprocessing/pool.py", line 192, in __init__ 
self._task_handler.start() 
File "/app/.heroku/python/lib/python3.6/threading.py", line 846, in start 
_start_new_thread(self._bootstrap, ()) 
RuntimeError: can't start new thread 

Any ideas of what problem is?


Attempt:

From here I got to know about ThreadPoolExecutor.

I have decided to give it a try:

import time
from concurrent.futures import ThreadPoolExecutor

while True:
    qty_fetched = 6
    time.sleep(random_secs(0.5))
    pending_updates = fetch_pending_updates(qty_fetched) #list of dicts

    if pending_updates:
        prio = 2 #some variable number between 0 and 4

        if prio > 3:
            qty_threads = 1

        elif prio == 0 or prio == 1:
            qty_threads = 4

        else:
            qty_threads = 3

        #the following lines changed
        with ThreadPoolExecutor(max_workers=qty_threads) as e:
            for pu in pending_updates:
                e.submit(self.run_update_NEW, pu)

    else:
        time.sleep(2)

I will keep the post updated explaining if this works.

J0ANMM
  • 7,849
  • 10
  • 56
  • 90

1 Answers1

2

An issue I can see in your code is that you have an infinite while True loop, where you create your pool, but you never actually close it. You keep now creating pools, but as you never close and join the pool, the "old" threads will most likely just hang there, and a fraction of a second later you create more of them. My guess is you just eventually exhaust your resources and hit a process or kernel limit somewhere.

I would move pool creation outside of the while loop and just keep using the same pool in your loop. This is the whole idea of a pool - to have processes or threads waiting for work to appear, removing process/thread creation overhead when launching repetitive tasks.

If there is a reason to relaunch the pool (I cannot figure out what that could be - if you need to renew your workers occasionally, you could use maxtasksperchild in your pool declaration), then at least close the old pool properly as you will not be feeding any more work to it.

Hannu
  • 11,685
  • 4
  • 35
  • 51
  • I thought it could be something related to having it inside an infinite loop, but I didn't find how to close the pool, so I figured it would be closed automatically after each instance. I'm not sure if I can move it outside of the loop, since I need `fetch_pending_updates` to fetch a different list every time. Is there any way to close the pool? Would `with ThreadPool(qty_threads) as pool:` work? – J0ANMM May 02 '18 at 10:31
  • But that is the whole point of a Pool. You run fetch_pending_updates and get a new pending_updates list, then you will use pool.map to map this new list to the pool. Trust me, it works. Move your pool definition outside your loop, keep your code exactly as it is and you will see. It works. Your list gets mapped at pool.map, not at pool creation. You can use pool.map as many times as you want, with a different iterable as a parameter. – Hannu May 02 '18 at 10:36
  • 2
    And your with clause will close the pool, so that will work as well. But you are wasting resources creating the pool every 0.5 seconds. You could do it only once. But if you just need to fix the code and not worry about resources, use with ThreadPool and it will fix the closing issue. – Hannu May 02 '18 at 10:37
  • 1
    and if you use `multiprocessing.Pool` as in your original attempt, you will just add `pool.close()` and `pool.join()` statements to your code. Close will stop the pool from accepting any more work but will let workers finish what they are doing, and join will then wait until everything has completed and it will terminate all workers. But the correct way to do this is to create the pool outside your loop and keep using the same pool with a new data set. – Hannu May 02 '18 at 10:40
  • Sorry, it was not clear in my question. `prio` is a variable depending on `fetch_pending_updates`. As I wanted to create different qty of threads depending of what is in the fecthed list. I have updated the question to make it clear. – J0ANMM May 02 '18 at 10:44
  • Concerning the other comment, the function `run_update_NEW` does a request to an API, which takes a while. That means each loop will take normally between 30secs and 1 minute. I think the waste of resource is not critical there. Could you edit the answer to see where to place `pool.close()` and `pool.join()`? – J0ANMM May 02 '18 at 10:47
  • 1
    No it is not if this is the case. In this case using `with ThreadPoolExecutor()` is a good way of doing this, and it will take care of garbage collecting and shutting down the pool for you. – Hannu May 02 '18 at 11:03