2

I was looking for some good example of managing worker process from Qt GUI created in Python. I need this to be as complete as possible, including reporting progress from the process, including aborting the process, including handling of possible errors coming from the process.

I only found some semi-finished examples which only did part of work but when I tried to make them complete I failed. My current design comes in three layers:

1) there is the main thread in which resides the GUI and ProcessScheduler which controls that only one instance of worker process is running and can abort it

2) there is another thread in which I have ProcessObserver which actually runs the process and understands the stuff coming from queue (which is used for inter-process communication), this must be in non-GUI thread to keep GUI responsive

3) there is the actual worker process which executes a given piece of code (my future intention is to replace multiprocessing with multiprocess or pathos or something else what can pickle function objects, but this is not my current issue) and report progress or result to the queue

Currently I have this snippet (the print functions in the code are just for debugging and will be deleted eventually):

import multiprocessing

from PySide import QtCore, QtGui
QtWidgets = QtGui

N = 10000000

# I would like this to be a function object
# but multiprocessing cannot pickle it :(
# so I will use multiprocess in the future
CODE = """
# calculates sum of numbers from 0 to n-1
# reports percent progress of finished work

sum = 0
progress = -1
for i in range(n):
    sum += i
    p = i * 100 // n
    if p > progress:
        queue.put(["progress", p])
        progress = p
queue.put(["result", sum])
"""


class EvalProcess(multiprocessing.Process):

    def __init__(self, code, symbols):
        super(EvalProcess, self).__init__()
        self.code= code
        self.symbols = symbols  # symbols must contain 'queue'

    def run(self):
        print("EvalProcess started")
        exec(self.code, self.symbols)
        print("EvalProcess finished")


class ProcessObserver(QtCore.QObject):
    """Resides in worker thread. Its role is to understand
    to what is received from the process via the queue."""

    progressChanged = QtCore.Signal(float)
    finished = QtCore.Signal(object)

    def __init__(self, process, queue):
        super(ProcessObserver, self).__init__()
        self.process = process
        self.queue = queue

    def run(self):
        print("ProcessObserver started")
        self.process.start()
        try:
            while True:
                # this loop keeps running and listening to the queue
                # even if the process is aborted
                result = self.queue.get()
                print("received from queue:", result)
                if result[0] == "progress":
                    self.progressChanged.emit(result[1])
                elif result[0] == "result":
                    self.finished.emit(result[1])
                    break

        except Exception as e:
            print(e)  # QUESTION: WHAT HAPPENS WHEN THE PROCESS FAILS?
        self.process.join()  # QUESTION: DO I NEED THIS LINE?
        print("ProcessObserver finished")


class ProcessScheduler(QtCore.QObject):
    """Resides in the main thread."""

    sendText = QtCore.Signal(str)

    def __init__(self):
        super(ProcessScheduler, self).__init__()
        self.observer = None
        self.thread = None
        self.process = None
        self.queue = None

    def start(self):
        if self.process:  # Q: IS THIS OK?
            # should kill current process and start a new one
            self.abort()
        self.queue = multiprocessing.Queue()
        self.process = EvalProcess(CODE, {"n": N, "queue": self.queue})
        self.thread = QtCore.QThread()
        self.observer = ProcessObserver(self.process, self.queue)
        self.observer.moveToThread(self.thread)
        self.observer.progressChanged.connect(self.onProgressChanged)
        self.observer.finished.connect(self.onResultReceived)
        self.thread.started.connect(self.observer.run)
        self.thread.finished.connect(self.onThreadFinished)
        self.thread.start()
        self.sendText.emit("Calculation started")

    def abort(self):
        self.process.terminate()
        self.sendText.emit("Aborted.")
        self.onThreadFinished()

    def onProgressChanged(self, percent):
        self.sendText.emit("Progress={}%".format(percent))

    def onResultReceived(self, result):
        print("onResultReceived called")
        self.sendText.emit("Result={}".format(result))
        self.thread.quit()

    def onThreadFinished(self):
        print("onThreadFinished called")
        self.thread.deleteLater()  # QUESTION: DO I NEED THIS LINE?
        self.thread = None
        self.observer = None
        self.process = None
        self.queue = None


if __name__ == '__main__':
    app = QtWidgets.QApplication([])

    scheduler = ProcessScheduler()

    window = QtWidgets.QWidget()
    layout = QtWidgets.QVBoxLayout(window)

    startButton = QtWidgets.QPushButton("sum(range({}))".format(N))
    startButton.pressed.connect(scheduler.start)
    layout.addWidget(startButton)

    abortButton = QtWidgets.QPushButton("Abort")
    abortButton.pressed.connect(scheduler.abort)
    layout.addWidget(abortButton)

    console = QtWidgets.QPlainTextEdit()
    scheduler.sendText.connect(console.appendPlainText)
    layout.addWidget(console)

    window.show()
    app.exec_()

It works kind of OK but it still lacks proper error handling and aborting of process. Especially I am now struggling with the aborting. The main problem is that the worker thread keeps running (in the loop listening to the queue) even if the process has been aborted/terminated in the middle of calculation (or at least it prints this error in the console QThread: Destroyed while thread is still running). Is there a way to solve this? Or any alternative approach? Or, if possible, any real-life and compete example of such task fulfilling all the requirements mentioned above? Any comment would be much appreciated.

  • Does http://stackoverflow.com/a/41605909/869951 help? – Oliver Jan 12 '17 at 06:25
  • Well, no,this is something else. I need worker processes instead of threads. The example seems to me a bit artificial because it keeps the GUI responsive just because of using sleep (which I think yields the interpreter to another thread) and calls `processEvents()` inside the thread execution. Without these I doubt the GUI would be responsive (it would be probably stuttering heavily). Moreover it cannot benefit from multiple cores. – HiFile.app - best file manager Jan 12 '17 at 12:25
  • Incorrect, the GUI remains responsive even without it. The processEvents allows the *non-gui thread* to remain responsive to GUI. Without it, the thread would not process abort signal until work was finished. Try it, it is really important principle: responsive= processes events. In any case, we use multiprocess in our app so I can take a closer look later. If the main problem is the "Destroyed while thread running", that is most likely because it's event loop is still running, call wait() on it after quit() (quit is not immediate). – Oliver Jan 12 '17 at 14:44
  • I will check your code again and thanks for clarification of some of the points. However my main intention for multiprocessing was to really use more cores and not being limited by GIL. – HiFile.app - best file manager Jan 12 '17 at 15:41
  • I understand, that's what we use multiprocessing for. They are separate issues. Have you tried adding the call to QThread.wait()? this is almost certainly the problem for error message. – Oliver Jan 12 '17 at 19:00

0 Answers0