4

Does

import multiprocessing
import schedule


def worker():
     #do some stuff


def sched(argv):
    schedule.every(0.01).minutes.do(worker)          
    while True:
        schedule.run_pending()


processs = []
..
..
p = multiprocessing.Process(target=sched,args)
..
..
processs.append(p)

for p in processs:
    p.terminate()

kills gracefully a list of processes ?

If not what is the simplest way to do it ?

The goal is to reload the configuration file into memory, so I would like to kill all children processes and create others instead, those latter will read the new config file.

Edit : Added more code to explain that I am running a while True loop

Edit : This is the new code after @dano suggestion

def get_config(self):
        from ConfigParser import SafeConfigParser
..
        return argv

def sched(self, args, event):
#schedule instruction:
        schedule.every(0.01).minutes.do(self.worker,args)
        while not  event.is_set():
                schedule.run_pending()                                                                    

def dispatch_processs(self, conf):
        processs = []
        event = multiprocessing.Event()

        for conf in self.get_config():
                process = multiprocessing.Process(target=self.sched,args=( i for i in conf), kwargs={'event' : event})
                processs.append((process, event)
return processs

def start_process(self, process):
        process.start()

def gracefull_process(self, process):
        process.join()

def main(self):
        while True:
                processs = self.dispatch_processs(self.get_config())
                print ("%s processes running " % len(processs) )

                for process, event in processs:                                                               

                        self.start_process(process)
                        time.sleep(1)
                        event.set()
                        self.gracefull_process(process)

The good thing about the code, is that I can edit config file and the process will reload its config also.

The problem is that only the first process runs and the others are ignored.

Edit : This saved my life , working with while True in schedule() is not a good idea, so I set up refresh_time instead

def sched(self, args, event):

    schedule.every(0.01).minutes.do(self.worker,args)
    for i in range(refresh_time):
            schedule.run_pending() 
            time.sleep(1)

def start_processs(self, processs):
        for p,event in processs:
                if not p.is_alive():
                        p.start()
                time.sleep(1)
                event.set()

        self.gracefull_processs(processs)

def gracefull_processs(self, processs):
        for p,event in processs:
                p.join()
        processs = self.dispatch_processs(self.get_config())
        self.start_processs(processs)

def main(self):

        while True:
                processs = self.dispatch_processs(self.get_config())

                self.start_processs(processs)
                break
        print ("Reloading function main")
        self.main()
Community
  • 1
  • 1
4m1nh4j1
  • 4,289
  • 16
  • 62
  • 104

3 Answers3

6

If you don't mind only aborting after worker has completed all of its work, its very simple to add a multiprocessing.Event to handle exiting gracefully:

import multiprocessing
import schedule


def worker():
     #do some stuff

def sched(argv, event=None):
    schedule.every(0.01).minutes.do(worker)          
    while not event.is_set():  # Run until we're told to shut down.
        schedule.run_pending()

processes = []
..
..
event = multiprocessing.Event()
p = multiprocessing.Process(target=sched,args, kwargs={'event' : event})
..
..
processes.append((p, event))

# Tell all processes to shut down
for _, event in processes:
    event.set()

# Now actually wait for them to shut down
for p, _ in processes:
    p.join()
dano
  • 91,354
  • 19
  • 222
  • 219
  • Thanks it works but I added `for p, _ in processs: p.start()` and added `event` to the list of args in `sched()`. The program was launched and exit after that. Practically, how could event helps me to kill and reload all the processes ? – 4m1nh4j1 Oct 29 '14 at 15:56
  • @4m1nh4j1 Well, you would probably want to move the code that launches the processes into a function. Then, when you've got some config file changes to push, you call `event.set()` for all the processes, wait for them to exit, then re-run the process launching script. And actually, depending on how the config file gets read, you might be able to use the `event` to simply tell each worker to re-read the config file, rather than having to kill them altogether. – dano Oct 29 '14 at 15:59
  • thanks, I made and edit to my code because I solved the problem partially, but I still have some problems. – 4m1nh4j1 Oct 30 '14 at 12:11
  • @4m1nh4j1 You need to use a different `event` for each `Process` you start. Move the `event = multiprocessing.Event()` line inside the `for conf in self.get_config():` loop. – dano Oct 30 '14 at 14:16
  • I found a solution (cf update), I am note sure I used your last comment but I used your answer from the beginning to understand and solve the problem. Thanks a lot for your time. – 4m1nh4j1 Oct 30 '14 at 14:39
2

A: No, both .terminate() & SIG_* methods are rather brutal

In a need to arrange a gracefull end of any process, as described in your post, there rather shall be some "soft-signalling" layer, that allows, on both ends, to send/receive smart-signalls without being dependent on the O/S interpretations ( O/S knows nothing about your application-level context and state of the respective work-unit, that is currently being processed ).

enter image description here You may want to read about such soft-signalling approach in links referred from >>> https://stackoverflow.com/a/25373416/3666197

Community
  • 1
  • 1
user3666197
  • 1
  • 6
  • 50
  • 92
1

No, it doesn't kill a process according to your own definition of gracefully - unless you take some additional steps.

Assuming you're using a unix system (since you mentioned scp), terminate sends a SIGTERM signal to the child process. You can catch this signal in the child process, and act accordingly (wait for scp to finish):

import signal

def on_terminate(signum, stack):
    wait_for_current_scp_operation()

signal.signal(signal.SIGTERM, on_terminate)

Here's a tutorial about handling and sending signals

loopbackbee
  • 21,962
  • 10
  • 62
  • 97