I presume you want to impose a time limit of 60 seconds on each submitted task. You should then use the apply_async
method, which will allow you to test for the completion of each individual task submitted. This gets a bit tricky according to how you want to do the timing:
First, you submit all the tasks with apply_async
, which returns an AsyncResult
instance for each submitted task. Now if you want to allow 60 seconds for all tasks to complete, then compute what that completion time would be by adding 60 to the current time. Then loop through all the AsyncResult
instances calling get
and passing a timeout value which should be the maximum time left to wait computed by subtracting the current time from the completion time. So if we had to wait, for example, 2 seconds for the first submitted task to complete, then we should only be waiting a maximum of 58 seconds for the next task to complete:
import multiprocessing as mp
import time
# To support platforms that create child processes using
# the "spawn" method (e.g. Windows):
if __name__ == '__main__':
pool = mp.Pool(processes=2)
arguments = [(self.meshgrid_lat, self.meshgrid_lon, self.dbzArray, 0, 100, 15), # dbz-Level: 15 dbz
(self.meshgrid_lat, self.meshgrid_lon, self.dbzArray, 15, 100, 28), # dbz-Level: 28 dbz
(self.meshgrid_lat, self.meshgrid_lon, self.dbzArray, 28, 100, 42), # dbz-Level: 42 dbz
(self.meshgrid_lat, self.meshgrid_lon, self.dbzArray, 42, 100, 55), # dbz-Level: 55 dbz
(self.meshgrid_lat, self.meshgrid_lon, self.dbzArray, 55, 100, 75), # dbz-Level: 75 dbz
(self.meshgrid_lat, self.meshgrid_lon, self.dbzArray, 75, 100, 85)] # dbz-Level: 85 dbz
async_results = [
pool.apply_async(dbz_processing, args=argument)
for argument in arguments
]
# None means that the task timed out and thus
# there is no result:
self.dbz_features = [None] * len(arguments)
completion_time = time.time() + 60 # 60 seconds from now
for idx, async_result in enumerate(async_results):
# How much time is left on the 60 second clock:
time_left = completion_time - time.time()
if time_left <= 60:
break # Kill any uncompleted tasks
try:
result = async_result.get(time_left)
except mp.TimeoutError:
pass
else:
self.dbz_features[idx] = result
pool.terminate() # Kill any uncompleted tasks
pool.join()
If instead you want to give each submitted task a full 60 seconds to complete from the time the pool process starts working on the task, the problem becomes knowing at what time each submitted task is being pulled of the pool's task queue to be worked on. Remember your pool size is only 2 and you are submitting more than 2 tasks so all submitted tasks cannot possibly start at the same time. The best you can do would be to increase the pool size to be equal to the number of tasks being submitted and assume that all tasks are starting simultaneously and then you can use the same logic above. But creating such a pool size might not be efficient. I would recommend you use an optimal pool size and come up with an expiration time for all tasks to be completed realizing that your tasks will not be all be running simultaneously.