0

Im running a python multiprocessing and i want to stop a process if it takes more then n seconds. Thats what i have right now. How can i cancel one of these processes if it takes more then 60 seconds?

import multiprocessing as mp



pool = mp.Pool(processes=2)
                
arguments = [(self.meshgrid_lat, self.meshgrid_lon, self.dbzArray, 0, 100, 15),     # dbz-Level: 15 dbz
             (self.meshgrid_lat, self.meshgrid_lon, self.dbzArray, 15, 100, 28),    # dbz-Level: 28 dbz
             (self.meshgrid_lat, self.meshgrid_lon, self.dbzArray, 28, 100, 42),    # dbz-Level: 42 dbz
             (self.meshgrid_lat, self.meshgrid_lon, self.dbzArray, 42, 100, 55),    # dbz-Level: 55 dbz
             (self.meshgrid_lat, self.meshgrid_lon, self.dbzArray, 55, 100, 75),    # dbz-Level: 75 dbz
             (self.meshgrid_lat, self.meshgrid_lon, self.dbzArray, 75, 100, 85)]    # dbz-Level: 85 dbz
        
                
self.dbz_features = pool.starmap(dbz_processing, arguments)
                                                         
pool.close()                        

pool.join()

dbz_processing is just a simple function - nothing special.

best regards

Woodz
  • 161
  • 2
  • 13
  • 1
    Does this answer your question? [Python multiprocessing module: join processes with timeout](https://stackoverflow.com/questions/26063877/python-multiprocessing-module-join-processes-with-timeout) – Kraay89 Aug 30 '23 at 13:18
  • unfortunately not. Its not a solution for using multiprocessing.pool – Woodz Aug 31 '23 at 14:34

3 Answers3

0

I don't know about a solution specific to multiprocessing since I only use threading, but one thing you could do is to have the function dbz_processing check how long it has been running and return early if it has exceeded the time limit. You could do this by getting the "epoch" in seconds or nanoseconds using the time.time or time.time_ns function, and storing the result to a variable when the function starts. When you want to check how much time has passed, get the epoch again and subtract by the value stored in the variable. Below is an example:

import time
...
def dbz_processing(...):
    time_limit = 60
    start_time = time.time()  # This variable stores the epoch in seconds
    while ...:  # Task condition here
        # Do stuff
        if time.time() - start_time > time_limit:
            return "early exit"
    else:
        return "normal exit"

If you have to reset the timer at any point in the function, just assign time.time() to start_time again.

Also, if your project is going to need multiple timers, I suggest creating a Timer class to make your code look neater. Below is a very simple example:

import time


class Timer:
    def __init__(self):
        self.start_time = 0
        self.reset()

    def reset(self):
        self.start_time = time.time()

    def get_time(self):
        return time.time() - self.start_time

With a class like that, you can just construct a new instance of it whenever you need a timer, and call 'reset' to reset it any number of times, and 'get_time' to get how many seconds has passed since the last reset. You can also easily change this class to measure in nanoseconds by changing 'time.time()' to 'time.time_ns()'.

  • The problem in your solution is, that i have to wait until it finished "#Do stuff". But what if "#Do stuff" exceeds the timelimit? – Woodz Aug 31 '23 at 14:39
  • @Woodz Yeah, for my solution you'll have to break your task into smaller pieces if you don't want it to exceed the time limit too much. If that's not possible, then my solution won't work for you. – I Like Python Sep 02 '23 at 09:37
0

I found out, that i dont have to use pool.starmap(), but pool.starmap_async() instead. In this case i am able to wait for a couple of seconds for the result and check if the returned AsyncResult-object is ready. So its not exactly what i wanted but a first step.

with mp.Pool(processes=num_prc) as pool:
                
    timelimit=240 #seconds

    arguments = [(self.meshgrid_lat, self.meshgrid_lon, self.dbzArray, 0, 100, 15), # dbz-Level: 15 dbz
                 (self.meshgrid_lat, self.meshgrid_lon, self.dbzArray, 15, 100, 28),    # dbz-Level: 28 dbz
                 (self.meshgrid_lat, self.meshgrid_lon, self.dbzArray, 28, 100, 42),    # dbz-Level: 42 dbz
                 (self.meshgrid_lat, self.meshgrid_lon, self.dbzArray, 42, 100, 55),    # dbz-Level: 55 dbz
                 (self.meshgrid_lat, self.meshgrid_lon, self.dbzArray, 55, 100, 75),    # dbz-Level: 75 dbz
                 (self.meshgrid_lat, self.meshgrid_lon, self.dbzArray, 75, 100, 85)]    # dbz-Level: 85 dbz
                
    
    result = pool.starmap_async(dbz_processing, arguments) 
                             
    result.wait(timeout=timelimit)
                    
    if result.ready():
                    
        self.dbz_features = result.get(timeout=1)
    else:

        self.dbz_features = [] 
Woodz
  • 161
  • 2
  • 13
0

I presume you want to impose a time limit of 60 seconds on each submitted task. You should then use the apply_async method, which will allow you to test for the completion of each individual task submitted. This gets a bit tricky according to how you want to do the timing:

First, you submit all the tasks with apply_async, which returns an AsyncResult instance for each submitted task. Now if you want to allow 60 seconds for all tasks to complete, then compute what that completion time would be by adding 60 to the current time. Then loop through all the AsyncResult instances calling get and passing a timeout value which should be the maximum time left to wait computed by subtracting the current time from the completion time. So if we had to wait, for example, 2 seconds for the first submitted task to complete, then we should only be waiting a maximum of 58 seconds for the next task to complete:

import multiprocessing as mp
import time

# To support platforms that create child processes using
# the "spawn" method (e.g. Windows):
if __name__ == '__main__':
    
    pool = mp.Pool(processes=2)
                    
    arguments = [(self.meshgrid_lat, self.meshgrid_lon, self.dbzArray, 0, 100, 15),     # dbz-Level: 15 dbz
                 (self.meshgrid_lat, self.meshgrid_lon, self.dbzArray, 15, 100, 28),    # dbz-Level: 28 dbz
                 (self.meshgrid_lat, self.meshgrid_lon, self.dbzArray, 28, 100, 42),    # dbz-Level: 42 dbz
                 (self.meshgrid_lat, self.meshgrid_lon, self.dbzArray, 42, 100, 55),    # dbz-Level: 55 dbz
                 (self.meshgrid_lat, self.meshgrid_lon, self.dbzArray, 55, 100, 75),    # dbz-Level: 75 dbz
                 (self.meshgrid_lat, self.meshgrid_lon, self.dbzArray, 75, 100, 85)]    # dbz-Level: 85 dbz
            

    async_results = [
        pool.apply_async(dbz_processing, args=argument)
        for argument in arguments
    ]

    # None means that the task timed out and thus
    # there is no result:
    self.dbz_features = [None] * len(arguments)

    completion_time = time.time() + 60 # 60 seconds from now

    for idx, async_result in enumerate(async_results):
        # How much time is left on the 60 second clock:
        time_left = completion_time - time.time()
        if time_left <= 60:
            break # Kill any uncompleted tasks
        try:
            result = async_result.get(time_left)
        except mp.TimeoutError:
            pass
        else:
            self.dbz_features[idx] = result

    pool.terminate() # Kill any uncompleted tasks
    pool.join()

If instead you want to give each submitted task a full 60 seconds to complete from the time the pool process starts working on the task, the problem becomes knowing at what time each submitted task is being pulled of the pool's task queue to be worked on. Remember your pool size is only 2 and you are submitting more than 2 tasks so all submitted tasks cannot possibly start at the same time. The best you can do would be to increase the pool size to be equal to the number of tasks being submitted and assume that all tasks are starting simultaneously and then you can use the same logic above. But creating such a pool size might not be efficient. I would recommend you use an optimal pool size and come up with an expiration time for all tasks to be completed realizing that your tasks will not be all be running simultaneously.

Booboo
  • 38,656
  • 3
  • 37
  • 60