In my pyqt GUI application I need to read Keyboard input from one thread and do some periodic tasks in another thread, both will interact with GUI. I am using QThreadPool
for for multi-threading. But, I have observed that, if thread 1 is waiting at input()
for reading string from Keyboard, other thread getting blocked in it's loop.
Is there any Pyqt friendly alternative for Python's input()
function?
Note: I have read about keypressevent
signal, but it is mostly about reading single key. The input put keyboard will be later replaced with Barcode reader which emulated USB keyboard, it will sent all characters in single shot.
My striped down code has given below. I am inheriting class Ui_DockWidget()
from imported library created by QT designer.
from datetime import datetime
import os, errno
import time
import io, re
import sqlite3
import subprocess
import atexit
import matrixux
from PyQt5.QtCore import QThread, pyqtSignal
from PyQt5 import QtCore, QtGui, QtWidgets
global matrix_db
from PyQt5.QtGui import QPainter, QColor, QPen
from PyQt5.QtCore import QTimer,QDateTime
import socket
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
from PyQt5.QtCore import *
def exit_handler():
global matrix_db
print("App exiting!")
matrix_db.close()
class matrix_gui(QtWidgets.QDockWidget):
def __init__(self):
super(matrix_gui, self).__init__()
self.ui = matrixux.Ui_DockWidget()
self.ui.setupUi(self)
self.timer = QtCore.QTimer(self)
self.timer.setInterval(1000)
self.thread = app_thread()
self.thread2 = timer_thread()
host='Jig ID:'+socket.gethostname()
self.ui.tot_counts.setText(host)
self.threadpool = QThreadPool()
print("Multithreading with maximum %d threads" % self.threadpool.maxThreadCount())
self.worker = Worker()
self.timerthread =TimerThread()
self.worker.signals.change_value_matrix.connect(self.set_MATRIX_EDIT)
self.worker.signals.change_value_cartridge.connect(self.set_CARTRIDGE_EDIT)
self.worker.signals.change_value_count.connect(self.set_CARTRIDGE_COUNT)
self.worker.signals.change_value_errors.connect(self.set_ERRORS)
self.timerthread.signals.update_time.connect(self.updateclock)
self.threadpool.start(self.worker)
self.threadpool.start(self.timerthread)
@QtCore.pyqtSlot()
def updateclock(self):
self.ui.tot_counts.setText(datetime.now().strftime("%d/%m/%Y - %H:%M:%S"))
print("timer updated\n")
@QtCore.pyqtSlot(str)
def set_MATRIX_EDIT(self, text):
try:
self.ui.MATRIX_EDIT.setText(text)
except Exception as e:
print(e)
@QtCore.pyqtSlot(str)
def set_CARTRIDGE_EDIT(self, text):
try:
self.ui.CARTRIDGE_EDIT.setText(text)
except Exception as e:
print(e)
@QtCore.pyqtSlot(str)
def set_CARTRIDGE_COUNT(self, text):
try:
self.ui.CARTRIDGE_COUNT.setText(text)
except Exception as e:
print(e)
@QtCore.pyqtSlot(str)
def set_ERRORS(self, text):
try:
self.ui.ERRORS.setText(text)
except Exception as e:
print(e)
class TimerThreadSignals(QObject):
update_time=pyqtSignal()
class TimerThread(QRunnable):
def __init__(self):
super(TimerThread, self).__init__()
self.signals = TimerThreadSignals()
def run(self):
while True:
time.sleep(1)
self.signals.update_time.emit()
print('rrr\n')
class WorkerSignals(QObject):
change_value_matrix=pyqtSignal(str)
change_value_cartridge=pyqtSignal(str)
change_value_count=pyqtSignal(str)
change_value_errors=pyqtSignal(str) #self.update_time.emit()
class Worker(QRunnable):
def __init__(self):
super(Worker, self).__init__()
self.signals = WorkerSignals()
@pyqtSlot()
def run(self):
global matrix_db
prev_cat='-'
prev_matrix='-'
prev_qr='-'
matrix='-'
matrix_db = sqlite3.connect('Matrix.db')
count=0
totmatrix=0
totcat=0
time.sleep(2)
while True:
try:
if matrix == '-':
print("Scan Matrix Pack1")
self.signals.change_value_matrix.emit("Scan Matrix Pack")
#matrix_gui().set_MATRIX_EDIT("Scan Matrix Pack")
qr=input()
print (qr)
self.signals.change_value_matrix.emit("Scan Matrix Pack")
except Exception as e:
print("Exception:"+datetime.now().strftime("%Y/%m/%d-%H:%M:%S"))
print(e)
if __name__ == "__main__":
import sys
atexit.register(exit_handler)
app = QtWidgets.QApplication(sys.argv)
MainWindow = matrix_gui()
MainWindow.show()
sys.exit(app.exec_())