1

I am trying to impose a TimeoutException on a try statement after n seconds. I have found a library which handles this called signal which would be perfect but I'm running into an error I have a hard time getting around. (This similar SO question is answered with the signal library.)

This is the boiled down code representing the problem:

import multiprocessing
from multiprocessing.dummy import Pool

def main():
    listOfLinks = []
    threadpool = Pool(2)
    info = threadpool.starmap(processRunSeveralTimesInParallel,zip(enumerate(listOfLinks)))
    threadpool.close()

def processRunSeveralTimesInParallel(listOfLinks):
    #The following is pseudo code representing what I would like to do:
    loongSequenceOfInstructions()
    for i in range(0,10):
        try for n seconds:
            doSomething(i)
        except (after n seconds):
            handleException()

    return something

When implementing the above question's solution with the signal library, I get the following error:

File "file.py", line 388, in main
    info = threadpool.starmap(processRunSeveralTimesInParallel,zip(enumerate(listOfLinks)))
  File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 372, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 771, in get
    raise self._value
  File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 51, in starmapstar
    return list(itertools.starmap(args[0], args[1]))
  File "file.py", line 193, in processRunSeveralTimesInParallel
    signal.signal(signal.SIGALRM, signal_handler)
  File "/Users/user/anaconda3/envs/proj/lib/python3.8/signal.py", line 47, in signal
    handler = _signal.signal(_enum_to_int(signalnum), _enum_to_int(handler))
ValueError: signal only works in main thread

Any idea how to cap the time just on a try block within a method run as a thread? Thank you!

Important information:

  • I am using the multiprocessing library to run several processes at the same time in parallel. From the error statement above, I suspect that the signal and multiprocessing libraries conflict.
  • The methods in the try statement are selenium (find_element_by_xpath) methods. However there are no timeout arguments available.

1 Answers1

2

Newly Updated Answer

If you are a way of looking for timing out without using signals, here is one way. First, since you are using threading, let's make it explicit and let's use the concurrent.futures module, which has a lot of flexibility.

When a "job" is submitted to the pool executor, a Future instance is returned immediately without blocking until a result call is made on that instance. You can specify a timeout value such that if the result is not available within the timeout period, an exception will be thrown. The idea is to pass to the worker thread the ThreadPoolExecutor instance and for it to run the critical piece of code that must be completed within a certain time period within its own worker thread. A Future instance will be created for that timed code but this time the result call will specify a timeout value:

from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time


def main():
    listOfLinks = ['a', 'b', 'c', 'd', 'e']
    futures = []
    """
    To prevent timeout errors due to lack of threads, you need at least one extra thread
    in addition to the ones being created here so that at least one time_critical thread
    can start. Of course, ideally you would like all the time_critical threads to be able to
    start without waiting. So, whereas the minimum number of max_workers would be 6 in this
    case, the ideal number would be 5 * 2 = 10.
    """
    with ThreadPoolExecutor(max_workers=10) as executor:
        # pass executor to our worker
        futures = [executor.submit(processRunSeveralTimesInParallel, tuple, executor) for tuple in enumerate(listOfLinks)]
        for future in futures:
            result = future.result()
            print('result is', result)


def processRunSeveralTimesInParallel(tuple, executor):
    link_number = tuple[0]
    link = tuple[1]
    # long running sequence of instructions up until this point and then
    # allow 2 seconds for this part:
    for i in range(10):
        future = executor.submit(time_critical, link, i)
        try:
            future.result(timeout=2) # time_critical does not return a result other than None
        except TimeoutError:
            handle_exception(link, i)
    return link * link_number


def time_critical(link, trial_number):
    if link == 'd' and trial_number == 7:
        time.sleep(3) # generate a TimeoutError


def handle_exception(link, trial_number):
    print(f'There was a timeout for link {link}, trial number {trial_number}.')


if __name__ == '__main__':
    main()

Prints:

result is
result is b
result is cc
There was a timeout for link d, trial number 7.
result is ddd
result is eeee

Using Threading and Multiprocessing

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, TimeoutError
import os
import time


def main():
    listOfLinks = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']
    futures = []
    cpu_count = os.cpu_count()
    with ThreadPoolExecutor(max_workers=cpu_count) as thread_executor, ProcessPoolExecutor(max_workers=cpu_count) as process_executor:
        # pass executor to our worker
        futures = [thread_executor.submit(processRunSeveralTimesInParallel, tuple, process_executor) for tuple in enumerate(listOfLinks)]
        for future in futures:
            result = future.result()
            print('result is', result)


def processRunSeveralTimesInParallel(tuple, executor):
    link_number = tuple[0]
    link = tuple[1]
    # long running sequence of instructions up until this point and then
    # allow 2 seconds for this part:
    for i in range(10):
        future = executor.submit(time_critical, link, i)
        try:
            future.result(timeout=2) # time_critical does not return a result other than None
        except TimeoutError:
            handle_exception(link, i)
    return link * link_number


def time_critical(link, trial_number):
    if link == 'd' and trial_number == 7:
        time.sleep(3) # generate a TimeoutError


def handle_exception(link, trial_number):
    print(f'There was a timeout for link {link}, trial number {trial_number}.')


if __name__ == '__main__':
    main()

Prints:

result is
result is b
result is cc
There was a timeout for link d, trial number 7.
result is ddd
result is eeee
result is fffff
result is gggggg
result is hhhhhhh
result is iiiiiiii
result is jjjjjjjjj

Multiprocessing Exclusively

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Process
import os
import time


def main():
    listOfLinks = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']
    futures = []
    workers = os.cpu_count() // 2
    with ProcessPoolExecutor(max_workers=workers) as process_executor:
        # pass executor to our worker
        futures = [process_executor.submit(processRunSeveralTimesInParallel, tuple) for tuple in enumerate(listOfLinks)]
        for future in futures:
            result = future.result()
            print('result is', result)


def processRunSeveralTimesInParallel(tuple):
    link_number = tuple[0]
    link = tuple[1]
    # long running sequence of instructions up until this point and then
    # allow 2 seconds for this part:
    for i in range(10):
        p = Process(target=time_critical, args=(link, i))
        p.start()
        p.join(timeout=2) # don't block for more than 2 seconds
        if p.exitcode is None: # subprocess did not terminate
            p.terminate() # we will terminate it
            handle_exception(link, i)
    return link * link_number


def time_critical(link, trial_number):
    if link == 'd' and trial_number == 7:
        time.sleep(3) # generate a TimeoutError


def handle_exception(link, trial_number):
    print(f'There was a timeout for link {link}, trial number {trial_number}.')


if __name__ == '__main__':
    main()

Prints:

result is
result is b
result is cc
There was a timeout for link d, trial number 7.
result is ddd
result is eeee
result is fffff
result is gggggg
result is hhhhhhh
result is iiiiiiii
result is jjjjjjjjj
Booboo
  • 38,656
  • 3
  • 37
  • 60
  • Hi Booboo, thank you very much for your insightful answer! However, I am not sure this solves my problem. I slightly edited the ```processRunSeveralTimesInParallel``` to make what I'm trying to do more clear. The issue is that I am already in a thread when running the try block. It seems to me that the ```ThreadPoolExecutor``` submits jobs as entire methods. My ```processRunSeveralTimesInParallel``` method is quite long and the try block is a smallish part of it. Any idea how to cap the time just on a try block within a method run as a thread? Your help is very much appreciated, thank you! – Antoine Neidecker Jul 14 '20 at 08:06
  • I have updated the answer. The logic is even simpler. – Booboo Jul 14 '20 at 10:47
  • Thank you for the update Booboo. I am trying to put the try/except blocks in a for loop but the loop acts very strangely. It only iterates through the first pass. Do you know how to make ```result = future.result(timeout=2)``` run in a for loop? Thank you! – Antoine Neidecker Jul 14 '20 at 19:15
  • I have updated the answer yet again. I don't what difficulty you could have had putting the Future creation and awaiting completion in a loop. Here I am passing the loop index so it only times out for one of the attempts. – Booboo Jul 14 '20 at 19:47
  • Yes, that last comment was due to an error on my part. Thank you so much for your help Booboo. You really got me out of this pickle. Have a good one – Antoine Neidecker Jul 14 '20 at 20:19
  • Hi Booboo, I was wondering how you would account for the case where you have ```len(listOfLinks) == 1000```. Only having N CPUs (N being much smaller than 1000) is there a way of setting your ```max_workers``` so that it only using N/2 CPUs at a time for threads and so that the other N/2 CPUs may be used as an additional process for the timing of the try block? I browsed the documentation but haven't found anything. Thank you! – Antoine Neidecker Jul 16 '20 at 12:03
  • First, let's not confuse mult-threading (`ThreadPoolExecutor`) with multiprocessing (`ProcessPoolExecutor`), the former suitable for I/O bound work only due to limitations posed by Python's Global Interpreter Lock that prevents concurrent execution of Python byte code in separate threads. With multiple *processes*, there is no point in specifying a value for `max_workers` that exceeds the number of CPUs you have but with lightweight threads that is not the case although there might be a number where you start to lose efficiency. (... more) – Booboo Jul 16 '20 at 12:46
  • So, your question about allocation of CPUs is really only relevant for multiprocessing. And remember, you have a requirement that ideally the number of threads should be twice the length of your `listOfLinks`. If it is fewer than the minimum number I have set forth, then you will get timeout errors due to the lack of threads. You can always break the list into smaller lists to keep the pool size to a reasonable number. It also sounds like you should be using multiprocessing at least for part of your processing that is CPU intensive. – Booboo Jul 16 '20 at 12:55
  • So, if in the worst case scenario if everything had to be run using multiprocessing and you had 8 processors, I would break my `listOfLinks` list up into smaller lists of length 4 (8 // 2., I would then have 4 processors to execute `processRunSeveralTimesInParallel` and 4 processors to execute `time_critical`. If only `time_critical` were CPU intensive, I would have `main` also create a `ProcessorPoolExecutor` instance and pass it as the argument to `processRunSeveralTimesInParallel`. You should now use no more than 8 threads in your thread pool but you don't need to break up your list. – Booboo Jul 16 '20 at 13:11
  • You have just demystified a lot of what I've been trying to do. Once again, thank you so much for generously taking the time to answer my questions. Best – Antoine Neidecker Jul 16 '20 at 13:52
  • I've updated answer with example showing threading and multiprocessing. Using multiprocessing exclusively is not as simple as I had suggested: you cannot pass the `ProcessPoolExecutor` instance as an argument to a subprocess. I will have to come up with an alternate later. – Booboo Jul 16 '20 at 13:52
  • Any chance you have a "buymeacoffee" account? I'd really like to make it up to you! – Antoine Neidecker Jul 16 '20 at 17:24
  • It was my pleasure if I was able to help at all. Stay healthy! – Booboo Jul 16 '20 at 18:42
  • Thanks again for everything Booboo. Have a good week! – Antoine Neidecker Jul 21 '20 at 09:12