6

I have a problem with spawning asynchronous subprocesses with timeout in Python 3.

What I want to achieve: I want to spawn multiple processes asynchronously without waiting for a results but I want also be assured that every spawned process will end within given timeout.

I have found similar problems here: Using module 'subprocess' with timeout and Asynchronous background processes in Python? but they does not solve my issue.

My code looks like this. I have Command class as suggested in Using module 'subprocess' with timeout :

class Command(object):
  def __init__(self, cmd):
    self.cmd = cmd
    self.process = None

  def run(self, timeout):
    def target():
      print('Thread started')
      args = shlex.split(self.cmd)
      self.process = subprocess.Popen(args, shell=True)
      self.process.communicate()
      print('Thread finished')

    thread = threading.Thread(target=target)
    thread.start()

    thread.join(timeout)
    if thread.is_alive():
      print('Terminating process')
      self.process.terminate()
      thread.join()

and then when I want to spawn subprocesses:

for system in systems:
  for service in to_spawn_system_info:
    command_str = "cd {0} && python proc_ip.py {1} {2} 0 2>>{3}".format(home_dir,
        service, system, service_log_dir)
    command = Command(command_str)
    command.run(timeout=60)

When I run this the output seems to wait for every command to spawn and end. I get

Thread started
Thread finished
Thread started
Thread finished
Thread started
Thread finished
Thread started
Thread finished

So my question is what I am doing wrong? Now I starting to wonder if it is possible to spawn a process and limit its execution by timeout.

Why I need this? The spawner script will run in cron. It will be executed every 10 minutes and it has to spawn about 20 subprocesses. I want to guarantee that every subprocess will end before the script will be run again from cron.

Community
  • 1
  • 1
sebast26
  • 1,732
  • 1
  • 22
  • 38

3 Answers3

2

As mentioned previously, the call to process.communicate() is making your code wait for the completion of the subprocess. However, if you just remove the communicate() call, the thread will exit immediately after spawning the process, causing your thread.join() call to exit too soon, and you'll kill off the subprocess prematurely. To do what you want without polling or busy waiting, you can set a timer that will kill the process (and runner thread) after a timeout if the process has not yet finished:

class Command(object):
  def __init__(self, cmd):
    self.cmd = cmd
    self.process = None

  def run(self, timeout):
    def target():
      print('Thread started')
      # May want/need to skip the shlex.split() when using shell=True
      # See Popen() constructor docs on 'shell' argument for more detail.
      args = shlex.split(self.cmd)
      self.process = subprocess.Popen(args, shell=True)
      self.timer.start()
      self.process.wait()
      self.timer.cancel()

    def timer_callback():
        print('Terminating process (timed out)')
        self.process.terminate()

    thread = threading.Thread(target=target)
    self.timer = threading.Timer(timeout, timer_callback)
    thread.start()
mshildt
  • 8,782
  • 3
  • 34
  • 41
  • When I tried this solution, it is not terminating my threads after timeout. I set timeout to 1 sec and added time.sleep(1) in target function. No threads were terminated. – sebast26 May 14 '13 at 13:20
  • Hrmm. The thread should terminate when target() exits. Keep in mind that as written above, you won't get a printout if the process exits normally without timing out. I'll take a closer look, I may have overlooked something. – mshildt May 14 '13 at 13:29
  • So if the thread terminates before the subprocess within it finishes, the subprocess will terminate? That is the opposite of what unutbu says. He said "then each subprocess you spawn will live on even after your threads finish". I was also under the impression that a subprocess would continue. – b10hazard May 14 '13 at 13:36
  • 1
    unutbu is correct, however, I added a `self.process.wait()` in the thread target to cause the thread to wait for the subprocess to finish, so the thread wouldn't exit before the subprocess is finished. There is something wrong with my solution though. I'm playing with it now and it doesn't seem to be running the process correctly...still looking at it. – mshildt May 14 '13 at 13:43
  • Ok, so I forgot to add a timer.start() call in my original code. It is working now for me. – mshildt May 14 '13 at 13:54
  • Tested with "sleep 3; echo hello" with various timeouts and am seeing the expected results. I had to skip the shlex.split() though, when using shell=True in Popen(). Popen() docs suggest doing this too. – mshildt May 14 '13 at 14:00
  • @epicbrew I get similar exceptions as described in comment for unutbu solution. In some cases the self.process is None. Why? – sebast26 May 14 '13 at 14:02
  • If your timer executes before the call to subprocess.Popen() in the thread target, then self.process would still be None. I moved the timer.start() call below the thread.start() call in my code above to avoid that. Does this fix it? – mshildt May 14 '13 at 14:09
  • Ok, I just updated the code to fix/avoid any weird race conditions by moving the timer start into the thread target right after the process is created. self.process should never be None now once you get to the timer callback. – mshildt May 14 '13 at 14:17
  • @epicbrew Ok. Now you moved it to the target function, right after Popen. I change that and it seems to work (no exceptions). I will test this solution again and see what others will have to say about it ;-) – sebast26 May 14 '13 at 14:23
1

Use threads that start and end independently of one another. This method would be useful if you knew all the commands you wanted to run ahead of time. Here is an example...

from threading import Thread
import subprocess
import Queue
import multiprocessing

class Command(object):
    def __init__(self, cmds):
        self.cmds = cmds

    def run_cmds(self):
        cmd_queue = Queue.Queue()
        for cmd in self.cmds:
            cmd_queue.put(cmd)

        available_threads = multiprocessing.cpu_count()
        for x in range(0,available_threads):
            t = Thread(target=self.run_cmd,args=(cmd_queue,))
            t.setDaemon(True)
            t.start()

        cmd_queue.join()


    def run_cmd(self, cmd_queue):
        while True:
            try: cmd = cmd_queue.get()
            except: break
            print 'Thread started'
            process = subprocess.Popen(cmd, shell=True)
            process.communicate()
            print 'Thread finished'
            cmd_queue.task_done()


# create list of commands you want to run
cmds = ['cd /home/nater/Desktop','cd /home/nater/Desktop','cd /home/nater/Desktop','cd /home/nater/Desktop','cd /home/nater/Desktop']
# create class
c = Command(cmds)
# run them...
c.run_cmds()

This would print....

Thread started
Thread started
 Thread started
 Thread startedThread finished

Thread started
Thread finishedThread finished

Thread finished
Thread finished

As you can see from the output the subprocesses start and end independently of one another and no subprocess waits for another subprocess to finish because they are all called in different threads. Naturally, you could add timeouts and whatever else you wanted to, this is just a simple example. This assumes you know all the commands you want to run. If you wanted to add a thread timeout, see epicbrews answer. You could incorporate his thread timeout example into this one if you wanted to.

b10hazard
  • 7,399
  • 11
  • 40
  • 53
  • Just as i have in my example? :P Altho i didn't describe it as clean as you did. – Torxed May 14 '13 at 12:17
  • Actually, your example had process.communicate() in it when I wrote my answer. Otherwise I would not have answered. I see in the edit history that you removed it. – b10hazard May 14 '13 at 12:30
  • Yepp, but removed it as soon as i could prior to your post because i just pasted his code in there to get it there before my connection broke (on a train so it DC's every 2 min) :) – Torxed May 14 '13 at 12:48
  • Ah, I see. I ended up removing that suggestion anyway. It seems that it would be counter-productive to what the OP was trying to do. – b10hazard May 14 '13 at 13:21
0
from threading import *
from time import time
import shlex
import subprocess
from random import randint
class Worker(Thread):
    def __init__(self, param, cmd, timeout=10):
        self.cmd = cmd
        self.timeout = timeout

        Thread.__init__(self)
        self.name = param
    def run(self):
        startup = time()
        print(self.name + ' is starting')

        args = shlex.split(self.cmd)
        #Shell should be false when given a list (True for strings)
        process = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)

        while time()-startup <= self.timeout:
            if process.poll() != None:
                break

        process.stdout.close()
        process.stdin.close()
        process.stderr.close()

        print(self.name + ' is dead')

for i in range(0, 100):
    x = Worker('Name-'+str(i), 'ping -n ' + str(randint(0,5)) + ' www.google.se')
    x.start()

while len(enumerate()) > 1:
    pass # Wait for the threads to die

Could this simplify your work method? Especially considering you don't need to wait for a result, this will just launch a class object into the outer space performing work for you with a timeout ofc.

Also note:

  • Not closing stdout, stdin and stderr will cause "To many filehandles open" on almost all systems
  • As pointed out in another answer, .communicate() waits for a process to exit (use .poll() instead)
Torxed
  • 22,866
  • 14
  • 82
  • 131