-1

I would like to add a keyboard detection for Linux to my existing Keyboard Detector for Windows. So I used pyudev to create a LinuxKeyboardDetector.

The script can be started and the graphical user interface appears, but unfortunately the keyboard detection does not recognize anything and does not report any error.

I suspect that there is a problem with multithreading using QRunnable.

Code

import sys
from datetime import datetime
import platform

from PyQt5. QtCore import QObject, QRunnable, QThreadPool, pyqtSignal
from PyQt5.QtWidgets import QApplication, QMainWindow, QTableWidget, QTableWidgetItem, QHeaderView


current_platform = platform.system()
if current_platform == "Windows":
    import pythoncom
    import wmi
elif current_platform == "Linux":
    import pyudev
    from pyudev.pyqt5 import MonitorObserver


def create_keyboard_detector():
    keyboard_detector = None
    if current_platform == "Windows":
        keyboard_detector = WindowsKeyboardDetector()
    elif current_platform == "Linux":
        keyboard_detector = LinuxKeyboardDetector()
    return keyboard_detector


class KeyboardDetectorSignals(QObject):
    keyboard_changed = pyqtSignal(str)


class WindowsKeyboardDetector(QRunnable):
    def __init__(self):
        super().__init__()

        self.signals = KeyboardDetectorSignals()

    def run(self):
        pythoncom.CoInitialize()
        device_connected_wql = "SELECT * FROM __InstanceCreationEvent WITHIN 2 WHERE TargetInstance ISA \'Win32_Keyboard\'"
        device_disconnected_wql = "SELECT * FROM __InstanceDeletionEvent WITHIN 2 WHERE TargetInstance ISA \'Win32_Keyboard\'"

        c = wmi.WMI()
        connected_watcher = c.watch_for(raw_wql=device_connected_wql)
        disconnected_watcher = c.watch_for(raw_wql=device_disconnected_wql)

        while True:
            try:
                connected = connected_watcher(timeout_ms=10)
            except wmi.x_wmi_timed_out:
                pass
            else:
                if connected:
                    self.signals.keyboard_changed.emit("Keyboard connected.")

            try:
                disconnected = disconnected_watcher(timeout_ms=10)
            except wmi.x_wmi_timed_out:
                pass
            else:
                if disconnected:
                    self.signals.keyboard_changed.emit("Keyboard disconnected.")


class LinuxKeyboardDetector(QRunnable):
    def __init__(self):
        super().__init__()
        self.signals = KeyboardDetectorSignals()
        self.context = pyudev.Context()
        self.monitor = pyudev.Monitor.from_netlink(self.context)
        self.observer = MonitorObserver(self.monitor)

    def run(self):
        self.monitor.filter_by(subsystem="usb", device_type="usb_device")
        self.observer.deviceEvent.connect(self.process_device_event)
        self.monitor.start()

    def process_device_event(self, device):
        if device['ID_INPUT_KEYBOARD'] == '1':
            if device.action == "add":
                self.signals.keyboard_changed.emit("Keyboard connected.")
            if device.action == "remove":
                self.signals.keyboard_changed.emit("Keyboard disconnected.")


class MainWindow(QMainWindow):

    def __init__(self):
        super().__init__()

        self.setGeometry(100, 100, 500, 500)
        self.setWindowTitle("Keyboard Logger")

        self.log_table = QTableWidget()
        self.log_table.setColumnCount(2)
        self.log_table.setShowGrid(True)
        self.log_table.setHorizontalHeaderLabels(["Time", "Event"])
        self.log_table.horizontalHeader().setStretchLastSection(True)
        self.log_table.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeToContents)
        self.setCentralWidget(self.log_table)
        self.show()

        self.threadpool = QThreadPool()
        keyboard_detector = create_keyboard_detector()
        keyboard_detector.signals.keyboard_changed.connect(self.add_row)
        self.threadpool.start(keyboard_detector)

    def add_row(self, event: str):
        now = datetime.now()
        datetime_string = now.strftime("%Y-%m-%d %H:%M:%S")

        row_count = self.log_table.rowCount()
        self.log_table.insertRow(row_count)
        self.log_table.setItem(row_count, 0, QTableWidgetItem(datetime_string))
        self.log_table.setItem(row_count, 1, QTableWidgetItem(event))


def main():
    app = QApplication(sys.argv)
    window = MainWindow()
    sys.exit(app.exec_())


if __name__ == '__main__':
    main()

Edit 1: Update LinuxKeyboardDetector class to use basic pyudev.MonitorObserver, instead of the dedicated pyqt version.

class LinuxKeyboardDetector(QRunnable):
    def __init__(self):
        super().__init__()
        self.signals = KeyboardDetectorSignals()
        self.context = pyudev.Context()
        self.monitor = pyudev.Monitor.from_netlink(self.context)
        # self.observer = MonitorObserver(self.monitor)
        self.observer = pyudev.MonitorObserver(self.monitor, self.process_device_event)

    def run(self):
        self.monitor.filter_by(subsystem="usb", device_type="usb_device")
        # self.observer.deviceEvent.connect(self.process_device_event)
        # self.monitor.start()
        self.observer.start()

    def process_device_event(self, device):
        if device['ID_INPUT_KEYBOARD'] == '1':
            if device.action == "add":
                self.signals.keyboard_changed.emit("Keyboard connected.")
            if device.action == "remove":
                self.signals.keyboard_changed.emit("Keyboard disconnected.")

Result 1: The following error message appears when a USB keyboard is plugged in or off.

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/home/ata/source/venv/lib/python3.6/site-packages/pyudev/monitor.py", line 532, in run
    self._callback(device)
  File "/home/ata/source/venv/lib/python3.6/site-packages/pyudev/monitor.py", line 508, in <lambda>
    callback = lambda d: event_handler(d.action, d)
TypeError: process_device_event() takes 2 positional arguments but 3 were given
Atalanttore
  • 349
  • 5
  • 22
  • @eyllanesc I assume you mean `keyboard_detector` in the `__init__()` method of the `MainWindow` class. If so, I have to inform you that a change to `self.keyboard_detector` unfortunately did not fix the problem. – Atalanttore Jul 19 '20 at 00:40
  • 1
    Why not use package like `pynput`. It is cross platform both async and sync keyboard and mouse monitors/controllers – Prayson W. Daniel Aug 04 '20 at 14:20
  • @PraysonW.Daniel Thank you very much for the hint. I did not know the `pynput` library yet. – Atalanttore Aug 05 '20 at 17:46

1 Answers1

1

According to this answer, you have to start the monitor before the Qt app event loop. In this case, you must not use a QRunnable at all, as the monitor will work as a standard QObject, working asynchronously and sending signals whenever required.

If you still want to keep the same interface, using QRunnable, I think that the only solution is to use the basic pyudev.MonitorObserver, instead of the dedicated pyqt version.

class LinuxKeyboardDetector(QRunnable):
    def __init__(self):
        super().__init__()
        self.signals = KeyboardDetectorSignals()
        self.context = pyudev.Context()
        self.monitor = pyudev.Monitor.from_netlink(self.context)
        self.observer = pyudev.MonitorObserver(self.monitor, self.process_device_event)

    def run(self):
        self.monitor.filter_by(subsystem="usb", device_type="usb_device")
        self.observer.start()
musicamante
  • 41,230
  • 6
  • 33
  • 58
  • It would be great to keep the interface the same. Unfortunately, an error message appears with your suggested solution, see Result 1. – Atalanttore Aug 02 '20 at 18:25
  • 1
    That's probably due to the argument signature, if I recall correctly, pyudev's MonitorObserver uses two arguments, with the first being the action type and the second the device. You can keep the same interface by changing to `process_device_event(self, *args):` and then get the device using `args[-1]`. – musicamante Aug 02 '20 at 18:51
  • Except for checking whether the USB device is a keyboard (`KeyError: 'ID_INPUT_KEYBOARD'`), it now works, thanks! – Atalanttore Aug 05 '20 at 17:43
  • 1
    @Atalanttore I didn't include that portion as I thought you already figured it out and you just provided minimal code. Since you're actually getting a dictionary, just ensure to go further only `if 'ID_INPUT_KEYBOARD' in device`. – musicamante Aug 05 '20 at 21:39
  • Thank you for your tip, but I have already failed once in checking whether the USB device is a keyboard because `libudev` does not provide the appropriate functionality. See https://github.com/pyudev/pyudev/issues/361 – Atalanttore Aug 09 '20 at 13:13