I have my main function run_tests
which firstly starts new, separate Thread that starts new processes and then in main loop I try to detect those that have finished and those that have timeouted.
import time
import traceback
from typing import List
from threading import Thread
from multiprocess import (Semaphore,
Process,
Pipe)
from hanging_threads import start_monitoring
class S_Process(Process):
def __init__(self,
test_name: str,
semaphore: Semaphore,
pipe_conn: Pipe,
*args,
**kwargs
) -> None:
Process.__init__(self, *args, **kwargs)
self.__child_conn = pipe_conn
self.__test_name = test_name
self.__semaphore = semaphore
def run(self) -> None:
self.__semaphore.acquire()
self.__child_conn.send(0)
Process.run(self)
self.__child_conn.send(self.__test_name)
self.__semaphore.release()
def terminate(self) -> None:
self.__semaphore.release()
super().terminate()
@property
def test_name(self) -> str:
return self.__test_name
class Task(object):
def __init__(self,
process: S_Process,
pipe_conn: Pipe,
) -> None:
self.process = process
self.pipe_conn = pipe_conn
self.duration = None
self.test_name = None
self.status = 'NOTRUN'
def run(self) -> None:
self.process.start()
self.pipe_conn.recv()
self.duration = time.perf_counter()
self.status = 'RUNNING'
def join(self) -> None:
self.process.join()
if self.process.is_alive():
self.process.kill()
self.set_result()
def terminate(self) -> None:
self.process.terminate()
def set_result(self) -> None:
self.test_name = self.pipe_conn.recv()
self.status = 'ENDED'
class Tasks(object):
def __init__(self) -> None:
self.remaining: List[Task] = []
self.completed: List[Task] = []
def add(self,
process: S_Process,
pipe_conn: Pipe
) -> None:
task = Task(process, pipe_conn)
self.remaining.append(task)
def complete(self, task: Task) -> None:
self.completed.append(task)
self.remaining.remove(task)
def info(self) -> List[str]:
output: List[str] = []
for task in self.completed:
output.append(f"Test Name: {task.result.test_name} " +
f"Result: {task.result.status} " +
f"Duration: {task.result.duration} " +
f"Retries: {task.result.retries}")
return output
def run_tests() -> None:
start_monitoring()
tasks = Tasks()
semaphore = Semaphore(2)
for i in range(8):
parent_conn, child_conn = Pipe()
process = S_Process(
target=test_function,
args=(),
test_name=f'test_{i}',
semaphore=semaphore,
pipe_conn=child_conn
)
tasks.add(process, parent_conn)
def runner(tasks):
try:
for task in tasks:
print('running task')
task.run()
except Exception:
print(traceback.format_exc())
TIMEOUT = 5
runner = Thread(target=runner, args=(tasks.remaining,))
runner.start()
while tasks.remaining:
for task in tasks.remaining:
if not task.process.is_alive() and task.status == 'RUNNING':
print('JOINING:', task.process.test_name)
task.join()
tasks.complete(task)
if task.status == "RUNNING":
check_time = time.perf_counter() - task.duration
if (check_time > TIMEOUT):
print('TERMINATING:', task.process.test_name)
task.terminate()
tasks.complete(task)
print('Rem:', len(tasks.remaining))
print('End:', len(tasks.completed))
time.sleep(0.2)
def test_function():
print('test_func')
time.sleep(3)
if __name__ == "__main__":
run_tests()
The method task.run() starts the process and waits for pipe_conn.recv() to get info that process has indeed acquired semaphore and started working so I can measure its time duration.
When I set sempaphore i.e. to value "2" (max 2 processes can run simultaneously) with 7-8 tasks and start run_tests it goes well until third/fourth process is being joined/terminated. Thanks to hanging_threads package I discovered that my runner thread dies with this error:
---------- Thread 9068 "Thread-2 (runner)" hangs ----------
File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.10_3.10.2032.0_x64__qbz5n2kfra8p0\lib\threading.py", line 973, in _bootstrap
self._bootstrap_inner()
File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.10_3.10.2032.0_x64__qbz5n2kfra8p0\lib\threading.py", line 1016, in _bootstrap_inner
self.run()
File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.10_3.10.2032.0_x64__qbz5n2kfra8p0\lib\threading.py", line 953, in run
self._target(*self._args, **self._kwargs)
File "c:\Users\<user>\Documents\Projects\eightest\eightest\runner.py", line 261, in runner
task.run()
File "c:\Users\<user>\Documents\Projects\eightest\eightest\runner.py", line 64, in run
if self.pipe_conn.recv() == 'started':
File "C:\Users\<user>\Documents\Projects\eightest\.venv\lib\site-packages\multiprocess\connection.py", line 258, in recv
buf = self._recv_bytes()
File "C:\Users\<user>\Documents\Projects\eightest\.venv\lib\site-packages\multiprocess\connection.py", line 313, in _recv_bytes
waitres = _winapi.WaitForMultipleObjects(
Why first few processes start and end well and then at some point thread is not able to handle the Pipe? Also the loop hangs on 6 tasks ended and 2 remaining and those 2 can never be started.