0

I have created a daemon thread which will be independently running once it is initiated with the main Object. I can push various functions that I want it to run through its stack. BUT I never want the daemon thread to have more than 2 functions in the stack (design choice for the project I'm building). So if a method called run_this_function is running in this thread and the main object pushes that function to the stack again, then I want to stop run_this_function midway and then restart the new function pushed to the thread.

My question is whether there is any way to stop a sequence of statements once they have been initiated.

import threading
import time


class myThread(object):
    """ 
    The run() method will be started and it will run in the background
    until the application exits.
    """

    def __init__(self, interval=1):
        self.interval = interval
        self.thread_stack = []
        thread = threading.Thread(target=self.run, args=())
        thread.daemon = True                            
        thread.start()                               

    def run(self):
        # Method that runs forever 
        lock = threading.Lock()
        while True:
            if self.thread_stack:
                lock.acquire()
                try: 
                    # if a method is already running on this thread, end it.
                    new_function = thread_stack.pop()
                finally:
                    lock.release()
                # run new function
            else: 
                time.sleep(self.interval)

    def some_function(self):
        #do something
        #do something
        #do something else
        #do one more thing

The above code is what I written so far. I would create a myThread object and push methods I want to run onto thread_stack. So if I had a function (say some_function) already running, how can I stop it midway like after the first 2 execution statements. Am I forced to have if statements for every line?

Also, feel free to comment/critique my use of threading. I'm still very new to it. Thanks! :)

Kausik Venkat
  • 37
  • 1
  • 8

1 Answers1

0

Python has two threading APIs:

Threads created using threading are difficult to kill or interrupt. This SO question details several approaches to the problem including:

  • having the thread periodically check for a stop condition
  • raising an exception in the thread (from another thread)
  • re-defining the __trace method

In addition, have a look at this github gist and the comments for another way to deliver a signal to another thread.

All of these methods have drawbacks and may fail to work in certain situations. In particular, threads created with threading cannot be interrupted if they are in a system call.

Threads created with the multiprocessing library, however, are real processes and can receive Unix signals or be killed with the terminate method. The disadvantage is that they run in a different address space and care must be taken when handling signals to ensure that resources are properly cleaned up.

Here is an example code of how you might do this with multiprocessing. After running the program, enter a delay (like 3) to add a new job to the queue. If the queue grows beyond size 2, the current job will be killed. Enter a delay of 0 to wait for all jobs to finish.

#!/usr/bin/env python

from multiprocessing import Process
import Queue
import os
import signal
import threading
import time

class RunJobs():
  def __init__(self):
    self.queue = Queue.Queue()
    self.current_process = None
    pass

  def run_loop(self):
    # intended to be run in 
    while True:
      # atomically get the next job
      job = self.queue.get(True, None)
      if job is None:
        break
      p = Process(target=run_job, args=job)
      self.current_process = p
      p.start()
      p.join()
      self.current_process = None

  def abort(self): # abort the current process
    p = self.current_process
    if p:
      p.terminate()
      # alternatively, use os.kill(p.pid, ...) to send a signal

  def add_job(self, job):
    # atomically add the job calling self.abort()
    # if there are too many jobs on the queue
    # N.B.: For illustrative purposes only. There is
    # a race condition here. To avoid it use locks.
    self.queue.put(job)
    if self.queue.qsize() > 2: self.abort()

def run_job(delay, message):
  for i in xrange(5):
    print  "\n ===", message, "i =", i
    time.sleep(delay)

def main():
  rj = RunJobs()
  t = threading.Thread(target=rj.run_loop)
  t.start()

  i = 1
  while True:
    delay = raw_input("Enter delay for job {}: ".format(i))
    delay = int(delay)
    if delay == 0:
      rj.add_job(None)
      break
    job = ( delay, "job {} - delaying for {}".format(i, delay) )
    rj.add_job(job)
    i += 1
  print "Waiting for all jobs to finish..."
  t.join()

if __name__ == '__main__':
  main()
Community
  • 1
  • 1
ErikR
  • 51,541
  • 9
  • 73
  • 124
  • hmm thanks for the code. It really helps explain multi-processing but I was actually on implementing one thread that never stops (while true loop), and adding functions to run through a global stack (thread_stack). I wanted to know how to end those functions without closing the thread. But I figured that out already. Thansk!! – Kausik Venkat Jul 18 '16 at 09:10
  • If you figured that out you should post it as an answer. – ErikR Jul 18 '16 at 09:16