1

I'm setting up a library to spawn as many threads as I can and have them complete any task they are assigned too. When unit testing the creation of the object containing 'n' number of threads and trying to iterate over each one at the object level I am running into an infinite loop.

I fixed the pickling issue with race conditions, but now when trying to output how many threads were created in a for loop, when debugging I encounter an infinite loop.

from __future__ import print_function

try:
    import sys
    from threading import Thread, Lock, current_thread
    from queue import Queue
except ImportError:
    raise ImportError


class Worker(Thread):
    """ Thread executing tasks from a given tasks queue """

    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                # Acquire locking mechanism for threads to prevent race condition
                Lock.acquire()
                func(*args, **kargs)
                # Release locking mechanism
                Lock.release()
            except Exception as e:
                # An exception happened in this thread
                raise e
            finally:
                if self.tasks is None:
                    # Mark process done once there are no more tasks to process
                    self.tasks.task_done()


class SpawnThreads:
    """Pool of threads consuming tasks from a queue."""

    def __init__(self, num_threads: int):
        self.tasks = Queue(num_threads)
        self.num_threads = num_threads
        for _ in range(self.num_threads):
            Worker(self.tasks)

    def __iter__(self):
        return self

    def __next__(self):
        next_value = 0
        while next_value < self.num_threads:
            try:
                next_value += 1
            except Exception:
                raise StopIteration
        return next_value

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue."""
        self.tasks.put((func, args, kargs))

    def task_list(self, func, args_list):
        """Add a list of tasks to the queue."""
        for args in args_list:
            self.add_task(func, args)

    def wait_completion(self):
        """.Wait for completion of all the tasks in the queue."""
        self.tasks.join()

    def get_qsize(self):
        """Return the approximate size of the queue."""
        return self.tasks.qsize()

    def get_current_thread(self):
        return current_thread()

This is the unittest to evaluate the creation of the thread spawning object and iterate and access each individual thread.

import pytest
import unittest
import SpawnThreads


@unittest
class TestThreadFactory(unittest.TestCase):

    def test_spawn_threads(self):
        workforce = SpawnThreads(5)

        self.assertIsNotNone(workforce)

        print(workforce)

        for w in workforce:
            print(w)

The expected output should be the address space/object, which is each thread (5 total).

More Specifically I would like to see this result 5 times in the console:

<ThreadFactory.thread_factory.SpawnThreads object at 0x0000024E851F5208>

I am getting the integer 5 returned infinitely instead of the 5 addresses of threads.

Justin Reddick
  • 441
  • 1
  • 5
  • 20
  • Possible duplicate of [Build a Basic Python Iterator](https://stackoverflow.com/questions/19151/build-a-basic-python-iterator) – krisz Aug 09 '19 at 16:35
  • @krisz I saw that, but I would like to print how many objects I am actually creating, and even with that method I am receiving an infinite loop of w showing the int '5', but continuously doing the same thing. – Justin Reddick Aug 09 '19 at 16:41
  • Didn't you mean `` 5 times? There is only a single `SpawnThreads` object. – krisz Aug 09 '19 at 17:20
  • @krisz yes that is what I wanted actually – Justin Reddick Aug 09 '19 at 17:22

2 Answers2

2

Your __next__ is definitionally always going to return 5 and never end. __next__ isn't a generator function, there is no state on entry aside from what's on self. So you always loop until next_value (a stateless local variable) is equal to self.num_threads (a value which never changes) and return it; your __next__ could simplify to just return self.num_threads, with no chance of StopIteration ever being raised (thus the infinite loop).

If you want it to return different values (specifically, each of your workers), you'll need state for that:

class SpawnThreads:
    """Pool of threads consuming tasks from a queue."""

    def __init__(self, num_threads: int):
        self.tasks = Queue(num_threads)
        self.next_value = 0               # Initial next_value in instance state
        self.num_threads = num_threads
        # Store list of workers
        self.workers = [Worker(self.tasks) for _ in range(self.num_threads)]

    def __iter__(self):
        return self

    def __next__(self):
        # Check if iteration has finished
        if self.next_value >= self.num_threads:
            raise StopIteration
        retval = self.workers[self.next_value]  # Save value to return to local
        self.next_value += 1                    # Increment state for next use
        return retval                           # Return value

Those last three lines could be replaced with an alternative tricksy approach to avoid the local variable if you really care:

        try:
            return self.workers[self.next_value]
        finally:
            self.next_value += 1

Or even better, you could use Python built-ins to do the work for you:

class SpawnThreads:
    """Pool of threads consuming tasks from a queue."""

    def __init__(self, num_threads: int):
        self.tasks = Queue(num_threads)

        self.num_threads = num_threads
        self.workers = [Worker(self.tasks) for _ in range(self.num_threads)]
        self.next_worker_iter = iter(self.workers) # Iterates workers

    def __iter__(self):
        return self

    def __next__(self):
        # Let the list iterator do the work of maintaining state,
        # raising StopIteration, etc.
        return next(self.next_worker_iter)

This approach is simpler, faster, and as a bonus, thread-safe, at least on CPython (if two threads iterate the same SpawnThreads instance, each of the workers will be produced exactly once, rather than values potentially being skipped or repeated).

If the goal is to make an iterable (can be iterated multiple times), not an iterator (can be iterated once from beginning to end and never again), the simplest solution is to make __iter__ return an iterator itself, removing the need for __next__ entirely:

class SpawnThreads:
    """Pool of threads consuming tasks from a queue."""

    def __init__(self, num_threads: int):
        self.tasks = Queue(num_threads)

        self.num_threads = num_threads
        self.workers = [Worker(self.tasks) for _ in range(self.num_threads)]

    def __iter__(self):
        # Makes this a generator function that produces each Worker once
        yield from self.workers
        # Alternatively:
        return iter(self.workers)
        # though that exposes more implementation details than anonymous generators
ShadowRanger
  • 143,180
  • 12
  • 188
  • 271
0

The problem is this method:

def __next__(self):
    next_value = 0
    while next_value < self.num_threads:
        try:
            next_value += 1
        except Exception:
            raise StopIteration
    return next_value

It's basically the same as

def __next__(self):
    return self.num_threads

As you see there isn't any iterator state, it will just return the same number forever. next_value += 1 will never throw an exception, next_value is just an integer.

To achieve what you want, just store the threads in a container and return an iterator to that container. Modify SpawnThreads:

def __init__(self, num_threads: int):
    self.tasks = Queue(num_threads)
    self.num_threads = num_threads
    self.threads = []
    for _ in range(self.num_threads):
        self.threads.append(Worker(self.tasks));

def __iter__(self):
    return iter(self.threads)

# remove the __next__() method
krisz
  • 2,686
  • 2
  • 11
  • 18
  • This makes two major changes to what appeared to be the intended behavior: 1) It returns the `Worker` objects, not integer values, and 2) It makes `SpawnThreads` an *iterable*, not an *iterator*; if you loop over the instance twice, you'll generate the values twice, where an iterator is iterated once and only once. – ShadowRanger Aug 09 '19 at 17:05
  • 1. OP wants objects to be returned (not Thread objects though, the same SpawnThreads object 5 times since he modified the question (which doesn't make any sence)) 2. SpawnThreads never was an iterator, since an iterator will hardly ever spawn threads. – krisz Aug 09 '19 at 17:11