44

I have a program that may have a lengthy execution. In the main module I have the following:

import signal
def run_program()
   ...time consuming execution...

def Exit_gracefully(signal, frame):
    ... log exiting information ...
    ... close any open files ...
    sys.exit(0)

if __name__ == '__main__':
    signal.signal(signal.SIGINT, Exit_gracefully)
    run_program()

This works fine, but I'd like the possibility to pause execution upon catching SIGINT, prompting the user if they would really like to quit, and resuming where I left off in run_program() if they decide they don't want to quit.

The only way I can think of doing this is running the program in a separate thread, keeping the main thread waiting on it and ready to catch SIGINT. If the user wants to quit the main thread can do cleanup and kill the child thread.

Is there a simpler way?

Raidri
  • 17,258
  • 9
  • 62
  • 65
Colin M
  • 441
  • 1
  • 4
  • 3

4 Answers4

76

The python signal handlers do not seem to be real signal handlers; that is they happen after the fact, in the normal flow and after the C handler has already returned. Thus you'd try to put your quit logic within the signal handler. As the signal handler runs in the main thread, it will block execution there too.

Something like this seems to work nicely.

import signal
import time
import sys

def run_program():
    while True:
        time.sleep(1)
        print("a")

def exit_gracefully(signum, frame):
    # restore the original signal handler as otherwise evil things will happen
    # in raw_input when CTRL+C is pressed, and our signal handler is not re-entrant
    signal.signal(signal.SIGINT, original_sigint)

    try:
        if raw_input("\nReally quit? (y/n)> ").lower().startswith('y'):
            sys.exit(1)

    except KeyboardInterrupt:
        print("Ok ok, quitting")
        sys.exit(1)

    # restore the exit gracefully handler here    
    signal.signal(signal.SIGINT, exit_gracefully)

if __name__ == '__main__':
    # store the original SIGINT handler
    original_sigint = signal.getsignal(signal.SIGINT)
    signal.signal(signal.SIGINT, exit_gracefully)
    run_program()

The code restores the original signal handler for the duration of raw_input; raw_input itself is not re-entrable, and re-entering it will lead to RuntimeError: can't re-enter readline being raised from time.sleep which is something we don't want as it is harder to catch than KeyboardInterrupt. Rather, we let 2 consecutive Ctrl-C's to raise KeyboardInterrupt.

  • 2
    Wow! This is really cool. However, I think that the ability to crtl-c out of the prompt should be in a decorator, as it makes the code much less mystifying. Would it be appropriate to provide that way as an answer (as I can't edit you answer to add this alternative way)? – mr2ert Aug 08 '13 at 00:09
  • No, decorator would make it more mystifying, and the signal handler setting is actually part of the logic in the function. Maybe I just add some comments :D – Antti Haapala -- Слава Україні Aug 08 '13 at 00:14
  • Decorators are a little magical... I'll keep my decorator version in the module I'm writing for this (because it is so sweet :) ). I'd make a note though that you could do this without changing the signal handler, and only exit if the user enters `y` into the prompt. – mr2ert Aug 08 '13 at 00:27
  • Cool, this works. I do have an issue running your code Antti where exiting the signal handler causes IOERROR: Interruped function call. Only when I set the sleep to be 0.001 seconds does it work, it breaks at 0.01 or higher. This is also only an issue on windows as far as I can tell, the code runs fine as is within Cygwin. – Colin M Aug 08 '13 at 18:45
  • By exiting I don't mean the call to sys.exit(). Just realized the potential ambiguity. It apparently doesn't like trying to resume the call to sleep() after returning from the signal handler. – Colin M Aug 08 '13 at 19:02
  • Ah, Windows. Windows **does not even have signals**, so how could one suppose the emulation works perfectly. It seems that if CTRL-C is caught by signal handler, then the time.sleep throws IOError? Notice. too, that it is documented that time.sleep might return earlier if a signal is delivered (possibly raising an exception) – Antti Haapala -- Слава Україні Aug 09 '13 at 00:28
  • 2
    The strategy doesn't work if we have multithreaded application. – Ciasto piekarz Jun 21 '14 at 13:10
  • why do you need store the original SIGINT handler ? plus I don't think its graceful exit I'd rather call it an ungraceful exit !! – Ciasto piekarz Jun 21 '14 at 18:15
  • @san bc signal handlers are not reentrable, and I want to run the original handler if the ctrl-c is pressed again. – Antti Haapala -- Слава Україні Jun 22 '14 at 09:27
  • 1
    @san also, the signal should sort of mostly work in multithreaded application that has 1 main thread handling all the input, but the signal handler must be set in that main thread... – Antti Haapala -- Слава Україні Jun 22 '14 at 09:28
  • @AnttiHaapala - what would happen exactly if signal handler was not re-entrant? I.e. if line `signal.signal(signal.SIGINT, original_sigint)` of the function `exit_gracefully` was commented out? – Matteo Aug 18 '16 at 01:08
  • @AnttiHaapala - Also, would it be possible to modify this program to completely ignore signal handlers? In a way that only killing the process would stop the program? – Matteo Aug 18 '16 at 01:10
  • @Matteo the posix signal-handlers if registered by `signal.signal` are one-shot, so if it wasn't reregistered, the next ctrl-c would throw `Keyboardinterrupt` – Antti Haapala -- Слава Україні Aug 18 '16 at 04:25
  • There are a couple ways to ignore ctrl-c in POSIX - You can make the terminal ignore ctrl-c, you can make the program to not have a controlling terminal, or you can ignore the signal; so you should be simply able to use `signal.signal(signal.SIGINT, signal.SIG_IGN)` to block the ctrl-c completely. – Antti Haapala -- Слава Україні Aug 18 '16 at 04:30
  • What are the evil things that could happen in `raw_input`, and why isn't catching `KeyboardInterrupt` enough? Why is it important to restore the original sigint handler when handling sigint in `exit_gracefully`? – so.very.tired Sep 03 '16 at 11:51
  • Where exactly is `original_sigint` being passed to the function? – HumbleBee Jan 20 '22 at 09:55
6

from https://gist.github.com/rtfpessoa/e3b1fe0bbfcd8ac853bf

#!/usr/bin/env python

import signal
import sys

def signal_handler(signal, frame):
  # your code here
  sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)

Bye!

Marc
  • 77
  • 2
  • 2
0

when procedure end then do something

suppose you just want to the procedure will do something after the task end

import time

class TestTask:
    def __init__(self, msg: str):
        self.msg = msg

    def __enter__(self):
        print(f'Task Start!:{self.msg}')
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print('Task End!')

    @staticmethod
    def do_something():
        try:
            time.sleep(5)
        except:
            pass

with TestTask('Hello World') as task:
    task.do_something()

when the process leaves with that will run __exit__ even with KeyboardInterrupt happen that are same.

if you don't like to see the error, add try ... except ...

@staticmethod
def do_something():
    try:
        time.sleep(5)
    except:
        pass

pause, continue, reset, and etc.

I don't have a perfect solution, but it may be useful to you.

It's means divided your process to many subprocesses and save it that finished.it will not be executed again since you find it already done.

import time
from enum import Enum

class Action(Enum):
    EXIT = 0
    CONTINUE = 1
    RESET = 2

class TestTask:
    def __init__(self, msg: str):
        self.msg = msg

    def __enter__(self):
        print(f'Task Start!:{self.msg}')
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print('Task End!')

    def do_something(self):
        tuple_job = (self._foo, self._bar)  # implement by yourself
        list_job_state = [0] * len(tuple_job)
        dict_keep = {}  # If there is a need to communicate between jobs, and you don’t want to use class members, you can use this method.
        while 1:
            try:
                for idx, cur_process in enumerate(tuple_job):
                    if not list_job_state[idx]:
                        cur_process(dict_keep)
                        list_job_state[idx] = True
                if all(list_job_state):
                    print('100%')
                    break
            except KeyboardInterrupt:
                print('KeyboardInterrupt. input action:')
                msg = '\n\t'.join([f"{action + ':':<10}{str(act_number)}" for act_number, action in
                                   enumerate([name for name in vars(Action) if not name.startswith('_')])
                                   ])
                case = Action(int(input(f'\t{msg}\n:')))
                if case == Action.EXIT:
                    break
                if case == Action.RESET:
                    list_job_state = [0] * len(tuple_job)

    @staticmethod
    def _foo(keep_dict: dict) -> bool:  # implement by yourself
        time.sleep(2)
        print('1%')
        print('2%')
        print('...')
        print('60%')
        keep_dict['status_1'] = 'status_1'
        return True

    @staticmethod
    def _bar(keep_dict: dict) -> bool:  # implement by yourself
        time.sleep(2)
        print('61%')
        print(keep_dict.get('status_1'))
        print('...')
        print('99%')
        return True

with TestTask('Hello World') as task:
    task.do_something()

console

input action number:2
Task Start!:Hello World
1%
2%
...
60%
KeyboardInterrupt. input action:
        EXIT:     0
        CONTINUE: 1
        RESET:    2
:1
61%
status_1
...
99%
100%
Task End!

Carson
  • 6,105
  • 2
  • 37
  • 45
0

Code

import signal
import time

flag_exit = False


def signal_handler(signal, frame):
    if input("  Ctrl+C detected. Do you really want to exit? y/n > ").lower().startswith('y'):
        global flag_exit
        flag_exit = True
        print("Wait for graceful exit...")


signal.signal(signal.SIGINT, signal_handler)


def get_time():
    from datetime import datetime
    now = datetime.now()
    dt_string = now.strftime("%Y-%m-%d %H:%M:%S")
    return dt_string


def process():
    for i in range(999):
        if flag_exit:
            break
        print(f"[{get_time()}] start process: {i}")
        time.sleep(5)
        print(f"[{get_time()}] end process: {i}")
        print()


if __name__ == "__main__":
    process()

Output

[2023-07-11 10:42:21] start process: 0
[2023-07-11 10:42:26] end process: 0

[2023-07-11 10:42:26] start process: 1
^C  Ctrl+C detected. Do you really want to exit? y/n > n
[2023-07-11 10:42:31] end process: 1

[2023-07-11 10:42:31] start process: 2
^C  Ctrl+C detected. Do you really want to exit? y/n > y
Wait for graceful exit...
[2023-07-11 10:42:36] end process: 2
LogWell
  • 45
  • 7