0

I am having the following issue:

I am trying to use concurrent.futures.ProcessPoolExecutor(), or anything similar, and show the progress of each process on a tkinter widget.

There is this answer: Python Tkinter multiprocessing progress but I can not quite make it to work.

The following simplified version of my code seems to work only when using ThreadPoolExecutor() which I do not want.

Thanks in advance for any help!

import concurrent.futures
import tkinter
import tkinter.ttk
import multiprocessing
import random
import time


class App:
    def __init__(self, root):
        self.root = root

        self.processes = 5
        self.percentage = []
        self.changing_labels = []
        self.queues = []
        self.values = []

        for i in range(self.processes):
            temp_percentage = tkinter.StringVar()
            temp_percentage.set("0 %")
            self.percentage.append(temp_percentage)

            temp_changing_label = tkinter.Label(self.root, textvariable=temp_percentage)
            temp_changing_label.pack()
            self.changing_labels.append(temp_changing_label)

            self.queues.append(multiprocessing.Queue())
            # Just same values that I want to do calculations on
            temp_value = []
            for ii in range(12):
                temp_value.append(random.randrange(10))
            self.values.append(temp_value.copy())

        self.start_processing()

    def start_processing(self):
        def save_values(my_values):     # Save my new calculated values on the same file or different file
            with open(f"example.txt", "a") as file:
                for v in my_values:
                    file.write(str(v))
                    file.write(" ")
                file.write("\n")

        def work(my_values, my_queue):  # Here I do all my work
            # Some values to calculate my progress so that I can update my Labels
            my_progress = 0
            step = 100 / len(my_values)
            # Do some work on the values
            updated_values = []
            for v in my_values:
                time.sleep(0.5)
                updated_values.append(v + 1)

                my_progress += step
                my_queue.put(my_progress)   # Add current progress to queue

            save_values(updated_values)     # Save it before exiting

        # This Part does no work with ProcessPoolExecutor, with ThreadPoolExecutor it works fine
        with concurrent.futures.ProcessPoolExecutor() as executor:
            results = [executor.submit(work, self.values[i], self.queues[i])
                       for i in range(self.processes)]
            # Run in a loop and update Labels or exit when done
            while True:
                results_done = [result.done() for result in results]

                if False in results_done:
                    for i in range(self.processes):
                        if results_done[i] is False:
                            if not self.queues[i].empty():
                                temp_queue = self.queues[i].get()
                                self.percentage[i].set(f"{temp_queue:.2f} %")
                        else:
                            self.percentage[i].set("100 %")
                        self.root.update()
                else:
                    break
        # Close window at the very end
        self.root.destroy()


def main():  # Please do not change my main unless it is essential
    root = tkinter.Tk()
    my_app = App(root)
    root.mainloop()


if __name__ == "__main__":
    main()
Kounabi
  • 3
  • 2
  • 2
    "Does not work" is not a helpful description of the problem. Also, saying that it "seems to work" with ThreadPoolExecutor() suggests that you can't really tell whether it works or not. You need to explain exactly what happens and also what you *expected* to happen. – Paul Cornelius Oct 22 '21 at 22:16

1 Answers1

0

The problem is caused by an exception being thrown inside the new processes. To see this exception, you can call the Future.result function, which raises the exception, e.g.

print([result.result() for result in results])

This gives the error AttributeError: Can't pickle local object 'App.start_processing.<locals>.work', so the problem is that work was defined inside another function.

After moving the work and save_values functions out of the start_processing method, the error changes to RuntimeError: Queue objects should only be shared between processes through inheritance. This can be fixed by using a multiprocessing.Manager to create the queues:


class App:
    def __init__(self, root):
        ...

        with multiprocessing.Manager() as manager:
            for i in range(self.processes):
                ...

                self.queues.append(manager.Queue())
            ...
            self.start_processing()

The print([result.result() for result in results]) debugging line can now be removed. The full code is as follows:

import concurrent.futures
import tkinter
import tkinter.ttk
import multiprocessing
import random
import time


class App:
    def __init__(self, root):
        self.root = root

        self.processes = 5
        self.percentage = []
        self.changing_labels = []
        self.queues = []
        self.values = []

        with multiprocessing.Manager() as manager:
            for i in range(self.processes):
                temp_percentage = tkinter.StringVar()
                temp_percentage.set("0 %")
                self.percentage.append(temp_percentage)

                temp_changing_label = tkinter.Label(self.root, textvariable=temp_percentage)
                temp_changing_label.pack()
                self.changing_labels.append(temp_changing_label)

                self.queues.append(manager.Queue())
                # Just same values that I want to do calculations on
                temp_value = []
                for ii in range(12):
                    temp_value.append(random.randrange(10))
                self.values.append(temp_value.copy())

            self.start_processing()

    @staticmethod
    def save_values(my_values):  # Save my new calculated values on the same file or different file
        with open(f"example.txt", "a") as file:
            for v in my_values:
                file.write(str(v))
                file.write(" ")
            file.write("\n")

    @classmethod
    def work(cls, my_values, my_queue):  # Here I do all my work
        # Some values to calculate my progress so that I can update my Labels
        my_progress = 0
        step = 100 / len(my_values)
        # Do some work on the values
        updated_values = []
        for v in my_values:
            time.sleep(0.5)
            updated_values.append(v + 1)

            my_progress += step
            my_queue.put(my_progress)  # Add current progress to queue

        cls.save_values(updated_values)  # Save it before exiting

    def start_processing(self):

        # This Part does no work with ProcessPoolExecutor, with ThreadPoolExecutor it works fine
        with concurrent.futures.ProcessPoolExecutor() as executor:
            results = [executor.submit(self.work, self.values[i], self.queues[i])
                       for i in range(self.processes)]
            # Run in a loop and update Labels or exit when done
            while True:
                results_done = [result.done() for result in results]

                if False in results_done:
                    for i in range(self.processes):
                        if results_done[i] is False:
                            if not self.queues[i].empty():
                                temp_queue = self.queues[i].get()
                                self.percentage[i].set(f"{temp_queue:.2f} %")
                        else:
                            self.percentage[i].set("100 %")
                        self.root.update()
                else:
                    break
        # Close window at the very end
        self.root.destroy()


def main():  # Please do not change my main unless it is essential
    root = tkinter.Tk()
    my_app = App(root)
    root.mainloop()


if __name__ == "__main__":
    main()

PS: I would recommend not all(results_done) rather than False in results_done, and not results_done[i] rather than results_done[i] is False.

Oli
  • 2,507
  • 1
  • 11
  • 23