I am creating Test Runner in Python that runs tests in parallel using multiprocess (fork of multiprocessing).
For now everything works fine, tests run well in this mode but the goal that I want to achieve right now is to being able to gather results in main (parent) process.
I have this Results class that I use to append particular TestCases:
class Results(object):
def __init__(self,
status: Status = Status.PASS,
message: str | None = None
) -> None:
self.status = status
self.message = message
self.tests = []
def add(self, test: TestCase) -> None:
"""Add a Test to the list of tests."""
if test.status is Status.FAIL:
self.fail()
self.tests.append(test)
def fail(self) -> None:
"""Indicate the test run had at least one failure."""
self.status = Status.FAIL
I have created this ShareManager class like in here but sadly I can't really find any tests in my list after the execution. The self.results should be shared class variable in which I am appending test instances. I am sure that tests are running. What did I do wrong with this shared variable?
class ShareManager(BaseManager):
pass
class MyProxy(NamespaceProxy):
_exposed_ = ('__getattribute__', '__setattr__', '__delattr__', 'add')
def Manager():
m = ShareManager()
m.start()
return m
ShareManager.register('Results', Results, MyProxy)
class Runner(object):
"""
Class responsible for creating processes in which
each test is executed separately and independently.
"""
def __init__(self) -> None:
self.processes: list = []
self.test_tree: list[dict] = create_tree()
manager = Manager()
self.results = manager.Results()
def run_tests(self) -> None:
concurrency = 2
semaphore = Semaphore(concurrency)
start_time = get_time()
# For each module in test hierarchy.
for module in self.test_tree:
module, mod_name, mod_members = self.importer(module)
# For each TestSuite in module.
for test_class in mod_members:
_class = getattr(module, next(iter(test_class.keys())))
# For each test in given TestSuite.
for test_name in test_class[next(iter(test_class.keys()))]:
_test_instance = _class(test_name)
# HERE I TRY TO ADD TEST INSTANCES TO MY SHARED CLASS
self.results.add(_test_instance)
semaphore.acquire()
process = S_Process(
target=getattr(_class, test_name),
args=(_test_instance,),
test_name=test_name,
start_time=start_time,
semaphore=semaphore,
)
process.start()
self.processes.append(process)
self.get_results()
def get_results(self) -> None:
"""
Wait untill all processes are finished and get test results.
"""
for process in self.processes:
process.join()
# SADLY, [] IS PRINTED HERE
print(self.results.tests)