I am getting a RuntimeError sometimes (not more than 1% of the time, I would say) when using ThreadPool
from multiprocessing.pool
in Python.
I have read that this happens if one tries to open hundreds of threads. In my case, it is supposed to be maximum 4 threads, so I am a bit confused why this is happening.
I have been using previously in the exact same environment ThreadPool
with 3 threads and never got an error.
My code is:
import time
from multiprocessing.pool import ThreadPool
while True:
qty_fetched = 6
time.sleep(random_secs(0.5))
pending_updates = fetch_pending_updates(qty_fetched) #list of dicts
if pending_updates:
prio = pending_updates[0]['prio'] #variable number between 0 and 4 (edited from original question)
if prio > 3:
qty_threads = 1
elif prio == 0 or prio == 1:
qty_threads = 4
else:
qty_threads = 3
pool = ThreadPool(qty_threads)
pool.map(self.run_update_NEW, pending_updates) #a list of 6 dicts will be given to the pool of 1, 3 or 4 threads
else:
time.sleep(2)
And the Traceback:
...
pool = ThreadPool(qty_threads)
File "/app/.heroku/python/lib/python3.6/multiprocessing/pool.py", line 789, in __init__
Pool.__init__(self, processes, initializer, initargs)
File "/app/.heroku/python/lib/python3.6/multiprocessing/pool.py", line 192, in __init__
self._task_handler.start()
File "/app/.heroku/python/lib/python3.6/threading.py", line 846, in start
_start_new_thread(self._bootstrap, ())
RuntimeError: can't start new thread
Any ideas of what problem is?
Attempt:
From here I got to know about ThreadPoolExecutor
.
I have decided to give it a try:
import time
from concurrent.futures import ThreadPoolExecutor
while True:
qty_fetched = 6
time.sleep(random_secs(0.5))
pending_updates = fetch_pending_updates(qty_fetched) #list of dicts
if pending_updates:
prio = 2 #some variable number between 0 and 4
if prio > 3:
qty_threads = 1
elif prio == 0 or prio == 1:
qty_threads = 4
else:
qty_threads = 3
#the following lines changed
with ThreadPoolExecutor(max_workers=qty_threads) as e:
for pu in pending_updates:
e.submit(self.run_update_NEW, pu)
else:
time.sleep(2)
I will keep the post updated explaining if this works.