I was looking for some good example of managing worker process from Qt GUI created in Python. I need this to be as complete as possible, including reporting progress from the process, including aborting the process, including handling of possible errors coming from the process.
I only found some semi-finished examples which only did part of work but when I tried to make them complete I failed. My current design comes in three layers:
1) there is the main thread in which resides the GUI and ProcessScheduler
which controls that only one instance of worker process is running and can abort it
2) there is another thread in which I have ProcessObserver
which actually runs the process and understands the stuff coming from queue (which is used for inter-process communication), this must be in non-GUI thread to keep GUI responsive
3) there is the actual worker process which executes a given piece of code (my future intention is to replace multiprocessing
with multiprocess
or pathos
or something else what can pickle function objects, but this is not my current issue) and report progress or result to the queue
Currently I have this snippet (the print
functions in the code are just for debugging and will be deleted eventually):
import multiprocessing
from PySide import QtCore, QtGui
QtWidgets = QtGui
N = 10000000
# I would like this to be a function object
# but multiprocessing cannot pickle it :(
# so I will use multiprocess in the future
CODE = """
# calculates sum of numbers from 0 to n-1
# reports percent progress of finished work
sum = 0
progress = -1
for i in range(n):
sum += i
p = i * 100 // n
if p > progress:
queue.put(["progress", p])
progress = p
queue.put(["result", sum])
"""
class EvalProcess(multiprocessing.Process):
def __init__(self, code, symbols):
super(EvalProcess, self).__init__()
self.code= code
self.symbols = symbols # symbols must contain 'queue'
def run(self):
print("EvalProcess started")
exec(self.code, self.symbols)
print("EvalProcess finished")
class ProcessObserver(QtCore.QObject):
"""Resides in worker thread. Its role is to understand
to what is received from the process via the queue."""
progressChanged = QtCore.Signal(float)
finished = QtCore.Signal(object)
def __init__(self, process, queue):
super(ProcessObserver, self).__init__()
self.process = process
self.queue = queue
def run(self):
print("ProcessObserver started")
self.process.start()
try:
while True:
# this loop keeps running and listening to the queue
# even if the process is aborted
result = self.queue.get()
print("received from queue:", result)
if result[0] == "progress":
self.progressChanged.emit(result[1])
elif result[0] == "result":
self.finished.emit(result[1])
break
except Exception as e:
print(e) # QUESTION: WHAT HAPPENS WHEN THE PROCESS FAILS?
self.process.join() # QUESTION: DO I NEED THIS LINE?
print("ProcessObserver finished")
class ProcessScheduler(QtCore.QObject):
"""Resides in the main thread."""
sendText = QtCore.Signal(str)
def __init__(self):
super(ProcessScheduler, self).__init__()
self.observer = None
self.thread = None
self.process = None
self.queue = None
def start(self):
if self.process: # Q: IS THIS OK?
# should kill current process and start a new one
self.abort()
self.queue = multiprocessing.Queue()
self.process = EvalProcess(CODE, {"n": N, "queue": self.queue})
self.thread = QtCore.QThread()
self.observer = ProcessObserver(self.process, self.queue)
self.observer.moveToThread(self.thread)
self.observer.progressChanged.connect(self.onProgressChanged)
self.observer.finished.connect(self.onResultReceived)
self.thread.started.connect(self.observer.run)
self.thread.finished.connect(self.onThreadFinished)
self.thread.start()
self.sendText.emit("Calculation started")
def abort(self):
self.process.terminate()
self.sendText.emit("Aborted.")
self.onThreadFinished()
def onProgressChanged(self, percent):
self.sendText.emit("Progress={}%".format(percent))
def onResultReceived(self, result):
print("onResultReceived called")
self.sendText.emit("Result={}".format(result))
self.thread.quit()
def onThreadFinished(self):
print("onThreadFinished called")
self.thread.deleteLater() # QUESTION: DO I NEED THIS LINE?
self.thread = None
self.observer = None
self.process = None
self.queue = None
if __name__ == '__main__':
app = QtWidgets.QApplication([])
scheduler = ProcessScheduler()
window = QtWidgets.QWidget()
layout = QtWidgets.QVBoxLayout(window)
startButton = QtWidgets.QPushButton("sum(range({}))".format(N))
startButton.pressed.connect(scheduler.start)
layout.addWidget(startButton)
abortButton = QtWidgets.QPushButton("Abort")
abortButton.pressed.connect(scheduler.abort)
layout.addWidget(abortButton)
console = QtWidgets.QPlainTextEdit()
scheduler.sendText.connect(console.appendPlainText)
layout.addWidget(console)
window.show()
app.exec_()
It works kind of OK but it still lacks proper error handling and aborting of process. Especially I am now struggling with the aborting. The main problem is that the worker thread keeps running (in the loop listening to the queue) even if the process has been aborted/terminated in the middle of calculation (or at least it prints this error in the console QThread: Destroyed while thread is still running
). Is there a way to solve this? Or any alternative approach? Or, if possible, any real-life and compete example of such task fulfilling all the requirements mentioned above? Any comment would be much appreciated.