Newly Updated Answer
If you are a way of looking for timing out without using signals, here is one way. First, since you are using threading, let's make it explicit and let's use the concurrent.futures
module, which has a lot of flexibility.
When a "job" is submitted to the pool executor, a Future
instance is returned immediately without blocking until a result
call is made on that instance. You can specify a timeout
value such that if the result is not available within the timeout period, an exception will be thrown. The idea is to pass to the worker thread the ThreadPoolExecutor
instance and for it to run the critical piece of code that must be completed within a certain time period within its own worker thread. A Future
instance will be created for that timed code but this time the result
call will specify a timeout
value:
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time
def main():
listOfLinks = ['a', 'b', 'c', 'd', 'e']
futures = []
"""
To prevent timeout errors due to lack of threads, you need at least one extra thread
in addition to the ones being created here so that at least one time_critical thread
can start. Of course, ideally you would like all the time_critical threads to be able to
start without waiting. So, whereas the minimum number of max_workers would be 6 in this
case, the ideal number would be 5 * 2 = 10.
"""
with ThreadPoolExecutor(max_workers=10) as executor:
# pass executor to our worker
futures = [executor.submit(processRunSeveralTimesInParallel, tuple, executor) for tuple in enumerate(listOfLinks)]
for future in futures:
result = future.result()
print('result is', result)
def processRunSeveralTimesInParallel(tuple, executor):
link_number = tuple[0]
link = tuple[1]
# long running sequence of instructions up until this point and then
# allow 2 seconds for this part:
for i in range(10):
future = executor.submit(time_critical, link, i)
try:
future.result(timeout=2) # time_critical does not return a result other than None
except TimeoutError:
handle_exception(link, i)
return link * link_number
def time_critical(link, trial_number):
if link == 'd' and trial_number == 7:
time.sleep(3) # generate a TimeoutError
def handle_exception(link, trial_number):
print(f'There was a timeout for link {link}, trial number {trial_number}.')
if __name__ == '__main__':
main()
Prints:
result is
result is b
result is cc
There was a timeout for link d, trial number 7.
result is ddd
result is eeee
Using Threading and Multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, TimeoutError
import os
import time
def main():
listOfLinks = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']
futures = []
cpu_count = os.cpu_count()
with ThreadPoolExecutor(max_workers=cpu_count) as thread_executor, ProcessPoolExecutor(max_workers=cpu_count) as process_executor:
# pass executor to our worker
futures = [thread_executor.submit(processRunSeveralTimesInParallel, tuple, process_executor) for tuple in enumerate(listOfLinks)]
for future in futures:
result = future.result()
print('result is', result)
def processRunSeveralTimesInParallel(tuple, executor):
link_number = tuple[0]
link = tuple[1]
# long running sequence of instructions up until this point and then
# allow 2 seconds for this part:
for i in range(10):
future = executor.submit(time_critical, link, i)
try:
future.result(timeout=2) # time_critical does not return a result other than None
except TimeoutError:
handle_exception(link, i)
return link * link_number
def time_critical(link, trial_number):
if link == 'd' and trial_number == 7:
time.sleep(3) # generate a TimeoutError
def handle_exception(link, trial_number):
print(f'There was a timeout for link {link}, trial number {trial_number}.')
if __name__ == '__main__':
main()
Prints:
result is
result is b
result is cc
There was a timeout for link d, trial number 7.
result is ddd
result is eeee
result is fffff
result is gggggg
result is hhhhhhh
result is iiiiiiii
result is jjjjjjjjj
Multiprocessing Exclusively
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Process
import os
import time
def main():
listOfLinks = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']
futures = []
workers = os.cpu_count() // 2
with ProcessPoolExecutor(max_workers=workers) as process_executor:
# pass executor to our worker
futures = [process_executor.submit(processRunSeveralTimesInParallel, tuple) for tuple in enumerate(listOfLinks)]
for future in futures:
result = future.result()
print('result is', result)
def processRunSeveralTimesInParallel(tuple):
link_number = tuple[0]
link = tuple[1]
# long running sequence of instructions up until this point and then
# allow 2 seconds for this part:
for i in range(10):
p = Process(target=time_critical, args=(link, i))
p.start()
p.join(timeout=2) # don't block for more than 2 seconds
if p.exitcode is None: # subprocess did not terminate
p.terminate() # we will terminate it
handle_exception(link, i)
return link * link_number
def time_critical(link, trial_number):
if link == 'd' and trial_number == 7:
time.sleep(3) # generate a TimeoutError
def handle_exception(link, trial_number):
print(f'There was a timeout for link {link}, trial number {trial_number}.')
if __name__ == '__main__':
main()
Prints:
result is
result is b
result is cc
There was a timeout for link d, trial number 7.
result is ddd
result is eeee
result is fffff
result is gggggg
result is hhhhhhh
result is iiiiiiii
result is jjjjjjjjj