1

Below I have the Threader class, which I use to thread an arbitrary number of functions, then return a list of the threaded functions' returns after joining the threads. One feature I want is the option to return a dictionary instead of a list. I found one method for doing so by requiring threaded functions to return a tuple. The tuple's first value would then be used for the key. I want to instead have it so that the threaded function's first argument is used as the key.

I learned that threads can be named, so I've set the names to be set as the threads' function's first argument on thread creation. The thread itself can access the name with getName(), but how would I get the name of the thread next in line to be .get() from the queue? (How do I access the thread objects within the queue?)

I just need it to work as described in the first paragraph, so I'm open to alternative methods of achieving the same effect.

from queue import Queue
from threading import Thread

class Threader(object):
    """thread arbitrary number of functions, then block when results wanted

    Attributes:
        thread_queue (Queue): The queue that holds the threads.
        threads (Thread list): Threads of functions added with add_thread.
    """

    def __init__(self):
        self.thread_queue = Queue()
        self.threads = []


    def add_thread(self, func, args):
        """add a function to be threaded"""
        self.threads.append(Thread(
            name=args[0], # Custom name using function's first argument
            target=lambda queue, func_args: queue.put(func(*func_args)),
            args=(self.thread_queue, args)))
        self.threads[-1].start()


    def get_results(self, return_dict=False):
        """block threads until all are done, then return their results

        Args:
            return_dict (bool): Return a dict instead of a list. Requires 
                each thread to return a tuple with two values.
        """

        for thread in self.threads:
            thread.join()

        if return_dict:
            results = {}
            while not self.thread_queue.empty():
                # Setting the dictionary key with returned tuple
                # How to access thread's name?
                key, value = self.thread_queue.get()
                results[key] = value
        else:
            results = []
            while not self.thread_queue.empty():
                results.append(self.thread_queue.get())

        return results

Example usage:

threader = Threader()
for region in regions:
    # probe_region is a function, and (region, tag_filter) are args for it
    threader.add_thread(probe_region, (region, tag_filter))
results = threader.get_results()

Edit: What I currently use:

My cleaned up and improved version of Mackay's answer (return is sorted by thread insertion):

from queue import Queue
from threading import Thread

class Threader(object):
    """thread arbitrary number of functions, then block when results wanted

    Attributes:
        result_queue (Queue): Thread-safe queue that holds the results.
        threads (list[Thread]): Threads of functions added with add_thread.
    """

    def __init__(self):
        self.result_queue = Queue()
        self.threads = []


    def worker(self, func, fargs):
        """insert threaded function into queue to make its return retrievable

        The index of the thread and the threaded function's first arg are 
        inserted into the queue, preceding the threaded function itself.

        Args: See add_thread
        """
        return self.result_queue.put([
            len(self.threads), fargs[0], func(*fargs)])


    def add_thread(self, func, fargs):
        """add a function to be threaded

        Args:
            func (function): Function to thread.
            fargs (tuple): Argument(s) to pass to the func function.

        Raises:
            ValueError: If func isn't callable, or if fargs not a tuple.
        """

        if not callable(func):
            raise ValueError("func must be a function.")
        if not isinstance(fargs, tuple) or not fargs:
            raise ValueError("fargs must be a non-empty tuple.")

        self.threads.append(Thread(target=self.worker, args=(func, fargs)))
        self.threads[-1].start()


    def get_results(self, return_dict=False):
        """block all threads, sort by thread index, then return thread results

        Args:
            return_dict (bool): Return dict instead of list. Threads' 
                function's first argument used as key.
        """

        for thread in self.threads:
            thread.join()

        thread_data = []
        while not self.result_queue.empty():
            thread_data.append(self.result_queue.get())
        thread_data.sort(key=lambda thread_index: thread_index[0])

        if return_dict:
            results = {}
            for _, key, thread_return in thread_data:
                results[key] = thread_return
        else:
            results = []
            for _, _, thread_return in thread_data:
                results.append(thread_return)

        return results
TakingItCasual
  • 771
  • 1
  • 7
  • 22
  • "[The queue module implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads.](https://docs.python.org/3/library/queue.html). Passing threads over a queue sounds strange. –  Apr 24 '18 at 19:30
  • When I created the class I just needed some way to get the returns of threaded functions. [I took inspiration from here](https://stackoverflow.com/a/36926134/2868017). If you're familiar with a more pythonic way of doing the same thing I'd like a link to a tutorial for said method. – TakingItCasual Apr 24 '18 at 19:41

1 Answers1

1

If you only want to achieve the result outlined in the first paragraph, where you use the first argument as the key, you can modify your code to do it like this:

from queue import Queue
from threading import Thread

class Threader(object):
    """thread arbitrary number of functions, then block when results wanted

    Attributes:
        queue (Queue): The thread-safe queue that holds the results.
        threads (Thread list): Threads of functions added with add_thread.
    """

    def __init__(self):
        self.results_queue = Queue()
        self.threads = []

    def worker(self, func, args):
        """run the function and save its results"""
        result = func(*args)
        # save result, along with a key for use later if needed (first argument)
        self.results_queue.put([args[0], result])

    def add_thread(self, func, fargs):
        """add a function to be threaded"""
        self.threads.append(Thread(target = self.worker, args = (func, fargs)))
        self.threads[-1].start()

    def get_results(self, return_dict=False):
        """block threads until all are done, then return their results

        Args:
            return_dict (bool): Return a dict instead of a list. Requires 
                each thread to return a tuple with two values.
        """
        for thread in self.threads:
            thread.join()

        if return_dict:
            results = {}
            while not self.results_queue.empty():
                # set the dictionary key as first argument passed to worker
                key, value = self.results_queue.get()
                results[key] = value
        else:
            results = []
            while not self.results_queue.empty():
                # set the dictionary key as first argument passed to worker
                key, value = self.results_queue.get()
                results.append(value)

        return results

NB it's not necessary to store the threads themselves in the queue, just the results. (A queue is a good choice for storing the results, as it deals with the access synchronization issues).

In the worker() function you can generate the key however you like; in the code above I've used the first argument as you've suggested.

A usage example would be:

def foo(*args):
    return "foo() " + repr(len(args))

def bar(*args):
    return "bar() " + repr(len(args))

def baz(*args):
    return "baz() " + repr(len(args))

threader = Threader()

threader.add_thread(foo, ["foo_key", "a"])
threader.add_thread(bar, ["bar_key", "b", "c"])
threader.add_thread(baz, ["baz_key", "d", "e", "f"])

print (threader.get_results(True))

This gives the output:

{'foo_key': 'foo() 2', 'bar_key': 'bar() 3', 'baz_key': 'baz() 4'}

Hope this may help.

AS Mackay
  • 2,831
  • 9
  • 19
  • 25
  • This is the better answer. There's just one mistake I want you to fix before I accept it. args should be expanded when passing it to func in the worker method (func(args) -> func(*args)). I got "unhashable type: 'list'" exceptions before applying the fix myself. – TakingItCasual Apr 25 '18 at 16:41
  • OK sure, if the functions passed to the worker method are using the *args idiom then that needs to be supported. I'll mod the answer as per your suggestion. – AS Mackay Apr 25 '18 at 20:29