I'm setting up a library to spawn as many threads as I can and have them complete any task they are assigned too. When unit testing the creation of the object containing 'n' number of threads and trying to iterate over each one at the object level I am running into an infinite loop.
I fixed the pickling issue with race conditions, but now when trying to output how many threads were created in a for loop, when debugging I encounter an infinite loop.
from __future__ import print_function
try:
import sys
from threading import Thread, Lock, current_thread
from queue import Queue
except ImportError:
raise ImportError
class Worker(Thread):
""" Thread executing tasks from a given tasks queue """
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
while True:
func, args, kargs = self.tasks.get()
try:
# Acquire locking mechanism for threads to prevent race condition
Lock.acquire()
func(*args, **kargs)
# Release locking mechanism
Lock.release()
except Exception as e:
# An exception happened in this thread
raise e
finally:
if self.tasks is None:
# Mark process done once there are no more tasks to process
self.tasks.task_done()
class SpawnThreads:
"""Pool of threads consuming tasks from a queue."""
def __init__(self, num_threads: int):
self.tasks = Queue(num_threads)
self.num_threads = num_threads
for _ in range(self.num_threads):
Worker(self.tasks)
def __iter__(self):
return self
def __next__(self):
next_value = 0
while next_value < self.num_threads:
try:
next_value += 1
except Exception:
raise StopIteration
return next_value
def add_task(self, func, *args, **kargs):
"""Add a task to the queue."""
self.tasks.put((func, args, kargs))
def task_list(self, func, args_list):
"""Add a list of tasks to the queue."""
for args in args_list:
self.add_task(func, args)
def wait_completion(self):
""".Wait for completion of all the tasks in the queue."""
self.tasks.join()
def get_qsize(self):
"""Return the approximate size of the queue."""
return self.tasks.qsize()
def get_current_thread(self):
return current_thread()
This is the unittest to evaluate the creation of the thread spawning object and iterate and access each individual thread.
import pytest
import unittest
import SpawnThreads
@unittest
class TestThreadFactory(unittest.TestCase):
def test_spawn_threads(self):
workforce = SpawnThreads(5)
self.assertIsNotNone(workforce)
print(workforce)
for w in workforce:
print(w)
The expected output should be the address space/object, which is each thread (5 total).
More Specifically I would like to see this result 5 times in the console:
<ThreadFactory.thread_factory.SpawnThreads object at 0x0000024E851F5208>
I am getting the integer 5 returned infinitely instead of the 5 addresses of threads.