I work on a simple PyQt5 app that connects to a remote host, and install a package on it using snmp, ssh.
Basicaly, the user write the ip address, press the start button, and here goes the magic ! My script works fine, but it freezes during the main function because I don't use threading. It freezes when it arrives to the part "status_copy" of start_process function.
I've tried to implement threading but I face a lot of issues and can't find a way to solve it now. Here's a part of the code for the thread worker that I wrote.
class WorkerThread(QtCore.QThread):
def run(self):
start_process(ui)
def test_thread(ui):
ui.button_start.setEnabled(False)
ui.thread = QtCore.QThread()
ui.worker = WorkerThread()
ui.worker.moveToThread(ui.thread)
ui.thread.started.connect(ui.worker.run)
ui.worker.finished.connect(ui.thread.quit)
ui.worker.finished.connect(ui.worker.deleteLater)
ui.thread.finished.connect(ui.thread.deleteLater)
ui.thread.start()
And now the other main part of the script. I know, the function start_process itself is really messy but I didn't take time to simplify it.
def set_ui (ui):
# Menubar behavior
ui.action_exit.triggered.connect(sys.exit)
# Screenshot View Behavior
position = ui.gridLayout.getItemPosition(ui.gridLayout.indexOf(ui.screenshot_view))
ui.screenshot_view = QWebEngineView(ui.centralwidget)
ui.gridLayout.addWidget(ui.screenshot_view, position[0], position[1], position[2], position[3])
# Button behavior
ui.button_start.setEnabled(False)
ui.button_screenshot.setEnabled(False)
ui.button_reboot.setEnabled(False)
# ui.button_start.clicked.connect(lambda x : start_process(ui))
ui.button_start.clicked.connect(lambda x : test_thread(ui))
ui.button_clear.clicked.connect(lambda x : clear_output(ui))
ui.button_screenshot.clicked.connect(lambda x : update_screenshot(ui))
ui.button_reboot.clicked.connect(lambda x : reboot_equipment(ui))
ui.buttonGroup.buttonClicked.connect(lambda x : select_siv(ui))
# Entry behavior
ui.entry_equipment.textChanged.connect(lambda x : validate_name_input(ui, x))
ui.entry_serialnumber.textChanged.connect(lambda x : validate_serial_number_input(ui, x))
# Instance variables
ui.ip_address = None
ui.username = None
ui.password = None
ui.entry_name_set = False
ui.entry_serial_number_set = False
ui.constructor = None
ui.siv = (ui.buttonGroup.checkedButton()).text()
ui.label_version.setText(f'Version {APP_VERSION}')
# UI functions
def start_process(ui):
"""
Launch the process once the start button is pressed.
"""
convert_sn_to_fqdn(ui)
# Perform ping test
if features.test_ping(ui.ip_address):
write_output(ui, '- Ping OK.')
ui.progressBar.setProperty('value', 15)
ui.constructor = find_constructor(ui)
match ui.constructor:
case '1':
write_output(ui, '- Deployment soft 1.')
write_output(ui, '- Installation.')
case '2':
write_output(ui, '- Deployment soft 2.')
write_output(ui, '- Installation.')
case _:
write_output(ui, '- Error : Unknown constructor.')
if ui.constructor:
ui.progressBar.setProperty('value', 40)
status_copy = copy_file(ui, ui.constructor)
if status_copy == True:
ui.progressBar.setProperty('value', 70)
match ui.constructor:
case '1':
match ui.siv:
case 'a':
param_install = 'stuff_a'
case 'b':
param_install = 'stuff_b'
features.snmpset(ui.ip_address, '%OID%', param_install)
features.snmpset(ui.ip_address, '%OID', '1')
case '2':
match ui.siv:
case 'a':
param_install = 'stuff_ab'
case 'b':
param_install = 'stuff_bb'
# Set SNMP stuff
features.snmpset(ui.ip_address, '%OID%', 'X')
features.snmpset(ui.ip_address, '%OID%', '1')
# Tempo 5s
time.sleep(5)
# Set other SNMP stuff
features.snmpset(ui.ip_address, '%OID%', param_install)
features.snmpset(ui.ip_address, '%OID%', '1')
write_output(ui, '- Copy finished, wait until the end of the installation on the equipment.')
write_output(ui, '- A check of the installation status will be performed in 60 seconds.')
time.sleep(60)
result_install = features.snmpget_result_install(ui.ip_address, ui.constructor)
if 'OK' in result_install:
write_output(ui, '- Installation successful')
elif 'KO' in result_install:
write_output(ui, '- Installation failed')
else:
i = 1
# Check installation status every 10 seconds for 1 minute in case it takes longer
while i != 6 and result_install == '':
time.sleep(10)
write_output(ui, f'- Installation still ongoing. Please wait. {i}/6')
result_install = features.snmpget_result_install(ui.ip_address, ui.constructor)
i += 1
if i == 6:
write_output(ui, '- Installation corrupted')
elif 'OK' in result_install:
write_output(ui, '- Installation successful')
else:
write_output(ui, '- Installation failed')
ui.button_screenshot.setEnabled(True)
ui.button_reboot.setEnabled(True)
update_screenshot(ui)
else:
write_output(ui, '- Deplyment error')
ui.progressBar.setProperty('value', 100)
else:
write_output(ui, f'- Ping KO. {ui.ip_address}')
ui.progressBar.setProperty('value', 100)
features.send_artium_response(
sn=ui.entry_serialnumber.text(),
dauphine=ui.entry_equipment.text(),
cp=os.getlogin(),
constructor=ui.constructor,
result='KO - Connection failed'
)
if __name__ == '__main__':
app = QtWidgets.QApplication(sys.argv)
app.setApplicationName('%NAME')
window = QtWidgets.QMainWindow()
ui = Ui_Window()
ui.setupUi(window)
set_ui(ui)
# Move window to the center of the screen
frame_geometry = window.frameGeometry()
center_position = QtWidgets.QDesktopWidget().availableGeometry().center()
frame_geometry.moveCenter(center_position)
window.move(frame_geometry.topLeft())
window.show()
sys.exit(app.exec_())
Hopping you can help me on this !