1

I am using the Python multiprocessing library. Whenever one of the processes throw a timeout error, my application ends itself. I want to keep the processes up.

I have a function that subscribes to a queue and listens to incoming messages:

def process_msg(i):
   #get new message from the queue
   #process it
   import time
   time.sleep(10)
   return True

I have created a Pool that creates 6 processes and executes the process_msg() function above. When the function times out, I want the Pool to call the function again and wait for new messages instead of exiting:


if __name__ == "main":

 import multiprocessing
 from multiprocessing import Pool

 pool = Pool(processes=6)
 collection = range(6)
 try:
  val = pool.map_async(process_msg, collection)
  try:
       res = val.get(5)
  except TimeoutError:
       print('timeout here')
 pool.close()
 pool.terminate()
 pool.join()

The code runs and when I get a timeout, the application terminates itself.

What I want it to do is to print that the timeout as occurred and call the same function again.

What's the right approach?

InfoLearner
  • 14,952
  • 20
  • 76
  • 124
  • try to put your code in while 'True' loop if you want to monitor continously. you are closing every thing in your code. try except is to handle the exception not to loop until you get answer. – Raady Jun 26 '20 at 09:28
  • Thanks. What is a better solution? I don't want the processes to die essentially and have 6 running all the time to handle 6 messages concurrently. – InfoLearner Jun 26 '20 at 09:32
  • Pardon my ignorance, but the time module doesn't seem to have a function called 'timeout'. Is your process_msg actually working? – Roy2012 Jun 26 '20 at 09:33
  • I have corrected the typo to time.sleep(). Thank you for pointing out. I have added time.sleep() for illustration purpose only. In my program, the method is called and works when a message arrives within the time limit. If a timeout occurs then the application dies. – InfoLearner Jun 26 '20 at 09:35

1 Answers1

1

Here's a skeleton for a program that works. The main issue you had is the use of pool.terminate, which "Stops the worker processes immediately without completing outstanding work" (see the documentation).

from multiprocessing import Pool, TimeoutError
def process_msg(i):
   #get new message from the queue
   #process it
   import time
   print(f"Starting to sleep, proxess # {i}")
   time.sleep(10)
   return True

def main():
    print("in main")
    pool = Pool(processes=6)
    collection = range(6)
    print("About to spawn sub processes")
    val = pool.map_async(process_msg, collection)
    while True: 
        try:
            print("Waiting for results")
            res = val.get(3)
            print(f"Res is {res}")
            break
        except TimeoutError:
            print("Timeout here")

    print("Closing pool")
    pool.close()
    # pool.terminate() # do not terminate - it kill the child processes 
    print ("Joining pool")
    pool.join()
    print("exiting main")

if __name__ == "__main__":
    main()
    

The output of this code is:

in main
About to spawn sub processes
Waiting for results
Starting to sleep, proxess # 0
Starting to sleep, proxess # 1
Starting to sleep, proxess # 2
Starting to sleep, proxess # 3
Starting to sleep, proxess # 4
Starting to sleep, proxess # 5
Timeout here
Waiting for results
Timeout here
Waiting for results
Timeout here
Waiting for results
Res is [True, True, True, True, True, True]
Closing pool
Joining pool
exiting main
Roy2012
  • 11,755
  • 2
  • 22
  • 35