1

I'm making remote API calls using threads, using no join so that the program could make the next API call without waiting for the last to complete.

Like so:

def run_single_thread_no_join(function, args):
thread = Thread(target=function, args=(args,))
thread.start()
return

The problem was I needed to know when all API calls were completed. So I moved to code that's using a cue & join.

Threads seem to run in serial now.

I can't seem to figure out how to get the join to work so that threads execute in parallel.

What am I doing wrong?

def run_que_block(methods_list, num_worker_threads=10):
'''
Runs methods on threads.  Stores method returns in a list.  Then outputs that list
after all methods in the list have been completed.

:param methods_list: example ((method name, args), (method_2, args), (method_3, args)
:param num_worker_threads: The number of threads to use in the block.
:return: The full list of returns from each method.
'''

method_returns = []

# log = StandardLogger(logger_name='run_que_block')

# lock to serialize console output
lock = threading.Lock()

def _output(item):
    # Make sure the whole print completes or threads can mix up output in one line.
    with lock:
        if item:
            print(item)
        msg = threading.current_thread().name, item
        # log.log_debug(msg)

    return

# The worker thread pulls an item from the queue and processes it
def _worker():

    while True:
        item = q.get()
        if item is None:
            break

        method_returns.append(item)
        _output(item)

        q.task_done()

# Create the queue and thread pool.
q = Queue()

threads = []
# starts worker threads.
for i in range(num_worker_threads):
    t = threading.Thread(target=_worker)
    t.daemon = True  # thread dies when main thread (only non-daemon thread) exits.
    t.start()
    threads.append(t)

for method in methods_list:
    q.put(method[0](*method[1]))

# block until all tasks are done
q.join()

# stop workers
for i in range(num_worker_threads):
    q.put(None)
for t in threads:
    t.join()

return method_returns
Emily
  • 2,129
  • 3
  • 18
  • 43

1 Answers1

1

You're doing all the work in the main thread:

for method in methods_list:
    q.put(method[0](*method[1]))

Assuming each entry in methods_list is a callable and a sequence of arguments for it, you did all the work in the main thread, then put the result from each function call in the queue, which doesn't allow any parallelization aside from printing (which is generally not a big enough cost to justify thread/queue overhead).

Presumably, you want the threads to do the work for each function, so change that loop to:

for method in methods_list:
    q.put(method)  # Don't call it, queue it to be called in worker

and change the _worker function so it calls the function that does the work in the thread:

def _worker():
    while True:
        method, args = q.get()  # Extract and unpack callable and arguments
        item = method(*args)    # Call callable with provided args and store result
        if item is None:
            break

        method_returns.append(item)
        _output(item)

        q.task_done()
ShadowRanger
  • 143,180
  • 12
  • 188
  • 271