Below I have the Threader class, which I use to thread an arbitrary number of functions, then return a list of the threaded functions' returns after joining the threads. One feature I want is the option to return a dictionary instead of a list. I found one method for doing so by requiring threaded functions to return a tuple. The tuple's first value would then be used for the key. I want to instead have it so that the threaded function's first argument is used as the key.
I learned that threads can be named, so I've set the names to be set as the threads' function's first argument on thread creation. The thread itself can access the name with getName(), but how would I get the name of the thread next in line to be .get() from the queue? (How do I access the thread objects within the queue?)
I just need it to work as described in the first paragraph, so I'm open to alternative methods of achieving the same effect.
from queue import Queue
from threading import Thread
class Threader(object):
"""thread arbitrary number of functions, then block when results wanted
Attributes:
thread_queue (Queue): The queue that holds the threads.
threads (Thread list): Threads of functions added with add_thread.
"""
def __init__(self):
self.thread_queue = Queue()
self.threads = []
def add_thread(self, func, args):
"""add a function to be threaded"""
self.threads.append(Thread(
name=args[0], # Custom name using function's first argument
target=lambda queue, func_args: queue.put(func(*func_args)),
args=(self.thread_queue, args)))
self.threads[-1].start()
def get_results(self, return_dict=False):
"""block threads until all are done, then return their results
Args:
return_dict (bool): Return a dict instead of a list. Requires
each thread to return a tuple with two values.
"""
for thread in self.threads:
thread.join()
if return_dict:
results = {}
while not self.thread_queue.empty():
# Setting the dictionary key with returned tuple
# How to access thread's name?
key, value = self.thread_queue.get()
results[key] = value
else:
results = []
while not self.thread_queue.empty():
results.append(self.thread_queue.get())
return results
Example usage:
threader = Threader()
for region in regions:
# probe_region is a function, and (region, tag_filter) are args for it
threader.add_thread(probe_region, (region, tag_filter))
results = threader.get_results()
Edit: What I currently use:
My cleaned up and improved version of Mackay's answer (return is sorted by thread insertion):
from queue import Queue
from threading import Thread
class Threader(object):
"""thread arbitrary number of functions, then block when results wanted
Attributes:
result_queue (Queue): Thread-safe queue that holds the results.
threads (list[Thread]): Threads of functions added with add_thread.
"""
def __init__(self):
self.result_queue = Queue()
self.threads = []
def worker(self, func, fargs):
"""insert threaded function into queue to make its return retrievable
The index of the thread and the threaded function's first arg are
inserted into the queue, preceding the threaded function itself.
Args: See add_thread
"""
return self.result_queue.put([
len(self.threads), fargs[0], func(*fargs)])
def add_thread(self, func, fargs):
"""add a function to be threaded
Args:
func (function): Function to thread.
fargs (tuple): Argument(s) to pass to the func function.
Raises:
ValueError: If func isn't callable, or if fargs not a tuple.
"""
if not callable(func):
raise ValueError("func must be a function.")
if not isinstance(fargs, tuple) or not fargs:
raise ValueError("fargs must be a non-empty tuple.")
self.threads.append(Thread(target=self.worker, args=(func, fargs)))
self.threads[-1].start()
def get_results(self, return_dict=False):
"""block all threads, sort by thread index, then return thread results
Args:
return_dict (bool): Return dict instead of list. Threads'
function's first argument used as key.
"""
for thread in self.threads:
thread.join()
thread_data = []
while not self.result_queue.empty():
thread_data.append(self.result_queue.get())
thread_data.sort(key=lambda thread_index: thread_index[0])
if return_dict:
results = {}
for _, key, thread_return in thread_data:
results[key] = thread_return
else:
results = []
for _, _, thread_return in thread_data:
results.append(thread_return)
return results