3

I'm writing a python program (in Linux, on a Raspberry Pi) to run as a daemon (using python-daemon) and I understand the 'runner' component will soon be deprecated.

For this reason, I'd like for the daemon to react to signals - for which I intend on using SIGUSR1 and SIGUSR2.

However, I'd like for it to react to more than 2 signals.

How can I create, receive and react to custom signals - i.e. SIGUSR3?

This has been reposted from software engineering stackexchange as it was deemed 'off-topic'.

Community
  • 1
  • 1
Ryan Hunt
  • 76
  • 1
  • 3

2 Answers2

-1

Use the signal module. From its documentation https://docs.python.org/2/library/signal.html , an example (make sure that an open does not hang forever, instead convert an alarm signal into a Python IOError exception)

import signal, os

def handler(signum, frame):
    print 'Signal handler called with signal', signum
    raise IOError("Couldn't open device!")

# Set the signal handler and a 5-second alarm
signal.signal(signal.SIGALRM, handler)
signal.alarm(5)

# This open() may hang indefinitely
fd = os.open('/dev/ttyS0', os.O_RDWR)

signal.alarm(0)          # Disable the alarm
nigel222
  • 7,582
  • 1
  • 14
  • 22
  • 3
    This is not what was asked... op asked how to create custom signals... This answer shows how to handle SIGALARM but not how to create custom signals. – Or Groman Feb 18 '20 at 12:13
-1
    import queue
    import threading
    import time

    class Singleton(type):
        __instance = None

        __instances = {}

    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
        return cls._instances[cls]


    class SignalManager(metaclass=Singleton):
        sig_map = {}
        asynq = queue.Queue()

    def __init__(self):
        t = threading.Thread(target=self.__listen)
        t.daemon = True
        t.start()

    def __listen(self):
        while True:
            if self.asynq.empty():
                time.sleep(3)
                continue

            signal, args, kwargs = self.asynq.get()
            self.emit(signal, *args, **kwargs)

    def connect(self, signal, slot):
        '''
        Connect signal with slot to receive message

        :param signal:
        :param slot:
        :return:
        '''
        if signal not in self.sig_map.keys():
            self.sig_map[signal] = []
        self.sig_map[signal].append(slot)

    def disconnect(self, signal, slot):
        '''
        Disconnect signal message

        :param signal:
        :param slot:
        :return:
        '''
        if signal in self.sig_map.keys():
            if slot in self.sig_map[signal]:
                self.sig_map[signal].remove(slot)

    def emit(self, signal, *args, **kwargs):
        '''
        Synchronous emission

        :param signal:
        :param args:
        :param kwargs:
        :return:
        '''
        if signal in self.sig_map.keys():
            for s in self.sig_map[signal]:
                try:
                    s(*args, **kwargs)
                except Exception as e:
                    print(e)

    def amit(self, signal, *args, **kwargs):
        '''
        Asyncrhonous emission. Immediately return. No context hang.

        :param signal:
        :param args:
        :param kwargs:
        :return:
        '''
        self.asynq.put([signal, args, kwargs])

    def nmit(self, signal, *args, **kwargs):
        '''
        N thread asynchronus emission. Immediately return. No context hang.

        :param signal:
        :param args:
        :param kwargs:
        :return:
        '''
        t = threading.Thread(target=lambda: self.emit(signal, *args, **kwargs))
        t.daemon = True
        t.start()
        # t.join()


sigmgr = SignalManager()


def slot_no_args():
    print('received!!!!')


def slot_sig1(**kwargs):
    print(kwargs)


def slot_sig2(*args, **kwargs):
    print(args)
    print(kwargs)
    time.sleep(1)


if __name__ == '__main__':
    sigmgr.connect('korea', slot_no_args)
    sigmgr.emit('korea')

    sigmgr.connect('test', slot_sig1)
    sigmgr.emit('test', data='hello')

    sigmgr.connect('test2', slot_sig2)

    # one thread async > time delay 1 sec by each call
    sigmgr.amit('test2', (1, 2, 3, 4,), data='hello')
    sigmgr.amit('test2', (1, 2, 3, 4,), data='hello')
    sigmgr.amit('test2', (1, 2, 3, 4,), data='hello')
    print('aync emit !')
    time.sleep(10)

    # n thread async > output right away all call
    sigmgr.nmit('test2', (1,2,3,4,), data='hello')
    sigmgr.nmit('test2', (1, 2, 3, 4,), data='hello')
    sigmgr.nmit('test2', (1, 2, 3, 4,), data='hello')
    print('n thread aync emit !')
    time.sleep(10)
    exit(0)
  • What about adding a few words of explanation on how this code is supposed to solve the problem described in the question ? Keep in mind that answers that contain an explanation are considered to be of higher quality and are more likely to attract upvotes. – mario Apr 09 '20 at 08:56
  • Just run it. You can find all you want right away in simple ways. – Kijung Kim Apr 13 '20 at 00:21