0

I have my main function run_tests which firstly starts new, separate Thread that starts new processes and then in main loop I try to detect those that have finished and those that have timeouted.

import time
import traceback

from typing import List
from threading import Thread
from multiprocess import (Semaphore,
                          Process,
                          Pipe)

from hanging_threads import start_monitoring


class S_Process(Process):

    def __init__(self,
                 test_name: str,
                 semaphore: Semaphore,
                 pipe_conn: Pipe,
                 *args,
                 **kwargs
                 ) -> None:

        Process.__init__(self, *args, **kwargs)
        self.__child_conn = pipe_conn
        self.__test_name = test_name
        self.__semaphore = semaphore

    def run(self) -> None:
        self.__semaphore.acquire()
        self.__child_conn.send(0)

        Process.run(self)
        self.__child_conn.send(self.__test_name)
        self.__semaphore.release()

    def terminate(self) -> None:
        self.__semaphore.release()
        super().terminate()

    @property
    def test_name(self) -> str:
        return self.__test_name


class Task(object):

    def __init__(self,
                 process: S_Process,
                 pipe_conn: Pipe,
                 ) -> None:
        self.process = process
        self.pipe_conn = pipe_conn
        self.duration = None
        self.test_name = None
        self.status = 'NOTRUN'

    def run(self) -> None:
        self.process.start()
        self.pipe_conn.recv()
        self.duration = time.perf_counter()
        self.status = 'RUNNING'

    def join(self) -> None:
        self.process.join()

        if self.process.is_alive():
            self.process.kill()
        self.set_result()

    def terminate(self) -> None:
        self.process.terminate()

    def set_result(self) -> None:
        self.test_name = self.pipe_conn.recv()
        self.status = 'ENDED'


class Tasks(object):

    def __init__(self) -> None:
        self.remaining: List[Task] = []
        self.completed: List[Task] = []

    def add(self,
            process: S_Process,
            pipe_conn: Pipe
            ) -> None:
        task = Task(process, pipe_conn)
        self.remaining.append(task)

    def complete(self, task: Task) -> None:
        self.completed.append(task)
        self.remaining.remove(task)

    def info(self) -> List[str]:
        output: List[str] = []
        for task in self.completed:
            output.append(f"Test Name: {task.result.test_name} " +
                          f"Result: {task.result.status} " +
                          f"Duration: {task.result.duration} " +
                          f"Retries: {task.result.retries}")
        return output


def run_tests() -> None:

    start_monitoring()
    tasks = Tasks()
    semaphore = Semaphore(2)

    for i in range(8):
        parent_conn, child_conn = Pipe()

        process = S_Process(
            target=test_function,
            args=(),
            test_name=f'test_{i}',
            semaphore=semaphore,
            pipe_conn=child_conn
        )
        tasks.add(process, parent_conn)

    def runner(tasks):
        try:
            for task in tasks:
                print('running task')
                task.run()

        except Exception:
            print(traceback.format_exc())

    TIMEOUT = 5

    runner = Thread(target=runner, args=(tasks.remaining,))
    runner.start()

    while tasks.remaining:
        for task in tasks.remaining:

            if not task.process.is_alive() and task.status == 'RUNNING':
                print('JOINING:', task.process.test_name)
                task.join()
                tasks.complete(task)

            if task.status == "RUNNING":
                check_time = time.perf_counter() - task.duration

                if (check_time > TIMEOUT):
                    print('TERMINATING:', task.process.test_name)
                    task.terminate()
                    tasks.complete(task)

        print('Rem:', len(tasks.remaining))
        print('End:', len(tasks.completed))
        time.sleep(0.2)


def test_function():
    print('test_func')
    time.sleep(3)


if __name__ == "__main__":
    run_tests()

The method task.run() starts the process and waits for pipe_conn.recv() to get info that process has indeed acquired semaphore and started working so I can measure its time duration.

When I set sempaphore i.e. to value "2" (max 2 processes can run simultaneously) with 7-8 tasks and start run_tests it goes well until third/fourth process is being joined/terminated. Thanks to hanging_threads package I discovered that my runner thread dies with this error:

---------- Thread 9068 "Thread-2 (runner)" hangs  ----------
        File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.10_3.10.2032.0_x64__qbz5n2kfra8p0\lib\threading.py", line 973, in _bootstrap
                self._bootstrap_inner()
        File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.10_3.10.2032.0_x64__qbz5n2kfra8p0\lib\threading.py", line 1016, in _bootstrap_inner
                self.run()
        File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.10_3.10.2032.0_x64__qbz5n2kfra8p0\lib\threading.py", line 953, in run
                self._target(*self._args, **self._kwargs)
        File "c:\Users\<user>\Documents\Projects\eightest\eightest\runner.py", line 261, in runner
                task.run()
        File "c:\Users\<user>\Documents\Projects\eightest\eightest\runner.py", line 64, in run
                if self.pipe_conn.recv() == 'started':
        File "C:\Users\<user>\Documents\Projects\eightest\.venv\lib\site-packages\multiprocess\connection.py", line 258, in recv
                buf = self._recv_bytes()
        File "C:\Users\<user>\Documents\Projects\eightest\.venv\lib\site-packages\multiprocess\connection.py", line 313, in _recv_bytes
                waitres = _winapi.WaitForMultipleObjects(

Why first few processes start and end well and then at some point thread is not able to handle the Pipe? Also the loop hangs on 6 tasks ended and 2 remaining and those 2 can never be started.

Alraku
  • 45
  • 1
  • 10
  • You have in method `task.run` the statement `self.pipe_conn.recv() == 'started':`, but that clearly cannot be (the stack trace suggests that it is `if self.pipe_conn.recv() == 'started':` and so you also have an indentation error. Also, in `S_Process.__init__` you hint that `pipe_conn` is a `Pipe`, but that is not correct either. How are these `multiprocessing.connection.Connection` instances being created? **You need to post a [minimal, reproducible example](https://stackoverflow.com/help/minimal-reproducible-example), ideally in a single source listing.** – Booboo Oct 15 '22 at 13:02
  • @Booboo Thank you for your response. I edited my post and added fully reproducible example which certainly shows that something hangs during main loop. Also I recommend to download multiprocess that I use (imports). – Alraku Oct 15 '22 at 15:26

1 Answers1

1

I spent quite a while on this trying to figure it out. The problem is that your task runner thread has:

            for task in tasks:
                print('running task')
                task.run()

where tasks is a reference to tasks.remaining. The problem arises from trying to iterate what is essentially the tasks.remaining list while the main thread is removing tasks from this very same list. As a result (on my desktop) two tasks never get iterated and thus never started. The solution is for the task runner thread to iterate a copy of the tasks.remaining list.

I have made other changes to your code. All of my modifications are commented with #Booboo. Also, I do not have a hanging_threads module and so I commented out the monitor-related code:

import time
import traceback

from typing import List
from threading import Thread
from multiprocess import (Semaphore,
                          Process,
                          Pipe)


#from hanging_threads import start_monitoring #Booboo


class S_Process(Process):

    def __init__(self,
                 test_name: str,
                 semaphore: Semaphore,
                 pipe_conn: Pipe,
                 *args,
                 **kwargs
                 ) -> None:

        Process.__init__(self, *args, **kwargs)
        self.__child_conn = pipe_conn
        self.__test_name = test_name
        self.__semaphore = semaphore

    def run(self) -> None:
        with self.__semaphore: #Booboo
            """
            """
            self.__child_conn.send(0)

            #Booboo we must catch any possible exceptions raise by the
            # target function to ensure we do the send below:
            try:
                Process.run(self)
            except Exception as e:
                print(e)
            self.__child_conn.send(self.__test_name)

    """
    def terminate(self) -> None:
        self.__semaphore.release()
        super().terminate()
    """

    @property
    def test_name(self) -> str:
        return self.__test_name


class Task(object):

    def __init__(self,
                 process: S_Process,
                 pipe_conn: Pipe,
                 ) -> None:
        self.process = process
        self.pipe_conn = pipe_conn
        self.duration = None
        self.test_name = None
        self.status = 'NOTRUN'

    def run(self) -> None:
        self.process.start()
        self.pipe_conn.recv()
        self.duration = time.perf_counter()
        self.status = 'RUNNING'

    def join(self) -> None:
        self.process.join()

        #Booboo This method is only called if the process is not alive:
        #assert not self.process.is_alive()
        """ #Booboo
        if self.process.is_alive(): # The process cannot be alive
            self.process.kill()
        """
        self.set_result()

    def terminate(self) -> None:
        self.process.terminate()

    def set_result(self) -> None:
        self.test_name = self.pipe_conn.recv()
        self.duration = time.perf_counter() - self.duration #Booboo
        self.status = 'ENDED'


class Tasks(object):

    def __init__(self) -> None:
        self.remaining: List[Task] = []
        self.completed: List[Task] = []

    def add(self,
            process: S_Process,
            pipe_conn: Pipe,
            ) -> None:
        task = Task(process, pipe_conn)
        self.remaining.append(task)

    def complete(self, task: Task) -> None:
        self.completed.append(task)
        self.remaining.remove(task)

    def info(self) -> List[str]:
        output: List[str] = []
        for task in self.completed:
            output.append(f"Test Name: {task.test_name} " + #Booboo
                          f"Result: {task.status} " + #Booboo
                          f"Duration: {task.duration}") # + #Booboo
                          #f"Retries: {task.result.retries}") #Booboo
        return output


def run_tests() -> None:

    #start_monitoring() #Booboo
    tasks = Tasks()
    semaphore = Semaphore(2)

    for i in range(8):
        parent_conn, child_conn = Pipe()

        process = S_Process(
            target=test_function,
            args=(),
            test_name=f'test_{i}',
            semaphore=semaphore,
            pipe_conn=child_conn
        )
        tasks.add(process, parent_conn)

    def runner(tasks):
        try:
            for task in tasks:
                task.run()
                print('running task', task.process.test_name) #Booboo

        except Exception:
            print(traceback.format_exc())

    TIMEOUT = 5

    runner = Thread(target=runner, args=(tasks.remaining.copy(),)) #Booboo
    runner.start()

    while tasks.remaining:
        for task in tasks.remaining.copy():

            if not task.process.is_alive() and task.status == 'RUNNING':
                print('JOINING:', task.process.test_name)
                task.join()
                tasks.complete(task)
            elif task.status == "RUNNING": #Booboo
                check_time = time.perf_counter() - task.duration

                if (check_time > TIMEOUT):
                    print('TERMINATING:', task.process.test_name)
                    task.terminate()
                    tasks.complete(task)

        print('Rem:', len(tasks.remaining))
        print('End:', len(tasks.completed))
        time.sleep(0.2)


def test_function():
    print('test_func')
    time.sleep(3)


if __name__ == "__main__":
    run_tests()
Booboo
  • 38,656
  • 3
  • 37
  • 60
  • Hi, thank you very much for answer and debug of this problem. Especially if you've spent so much time on it. I marked your answer as accepted answer cause it did solve my problem. I tried to figure it out for 2 days and tbh i am still unsure why .copy() would work solve this. I have read it creates shallow copy of the list so the newly created can be freely modified. But my common sense says that It SHOULD not be different copy, could you explain that to me? – Alraku Oct 16 '22 at 08:53
  • 1
    Your list contains *references* to task objects. If you make a shallow copy of the list, that copy is still referencing the identical task objects. If a thread is iterating the list (slowly) while another thread is removing tasks from the list, depending on how a list is implemented and iterated, it is possible that the iterator thread will permanently skip over tasks that have not been removed. See `test1` in [this demo](https://ideone.com/cjom80) where one thread is removing elements with an even `x` attribute while another thread is iterating the list but skips over one of the odd ones. – Booboo Oct 16 '22 at 10:38
  • See [Dynamic array](https://en.wikipedia.org/wiki/Dynamic_array). See [this](https://stackoverflow.com/questions/6022764/python-removing-list-element-while-iterating-over-list). Your main thread should ideally be iterating over a copy of the list and removing elements from the original list. Otherwise, it may also be skipping over elements. This is not fatal because it repeats the entire iteration until the list is empty. I have made the change to the code above. – Booboo Oct 16 '22 at 10:53
  • Thank you once more for clarification and this additional example. Adjusted my code with your hints. – Alraku Oct 16 '22 at 13:17