21

The requirement is to start five threads, and wait only in the fastest thread. All five threads went to look for the same data 5 directions, and one is enough to continue the control flow.

Actually, I need to wait for the first two threads to return, to verify against each other. But I guess if I know how to wait for the fastest. I can figure out how to wait for the second-fastest.

A lot talk about join(timeout), but you don't know in advance which one to wait (which one to apply join in advance).

Peter Mortensen
  • 30,738
  • 21
  • 105
  • 131
Tankman六四
  • 1,638
  • 2
  • 13
  • 19

6 Answers6

27

Use a queue: each thread when completed puts the result on the queue and then you just need to read the appropriate number of results and ignore the remainder:

#!python3.3
import queue    # For Python 2.x use 'import Queue as queue'
import threading, time, random

def func(id, result_queue):
    print("Thread", id)
    time.sleep(random.random() * 5)
    result_queue.put((id, 'done'))

def main():
    q = queue.Queue()
    threads = [ threading.Thread(target=func, args=(i, q)) for i in range(5) ]
    for th in threads:
        th.daemon = True
        th.start()

    result1 = q.get()
    result2 = q.get()

    print("Second result: {}".format(result2))

if __name__=='__main__':
    main()

Documentation for Queue.get() (with no arguments it is equivalent to Queue.get(True, None):

Queue.get([block[, timeout]])

Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

Duncan
  • 92,073
  • 11
  • 122
  • 156
  • 2
    Won't this raise an `Empty` exception if the Queue is empty when you do `q.get()`? – Mike Jul 10 '13 at 16:53
  • 3
    @Michael, the default for `q.get()` is to do a blocking get, so no it won't throw an exception instead it will block the main thread until there is a result available. – Duncan Jul 10 '13 at 17:43
  • @Duncan Shouldn't one after the two q.get()'s terminate the active threads in order to finish cleanly? – Peter Mueller Sep 11 '19 at 13:45
  • @PeterMueller for this example it isn't needed: they aren't daemon threads so the script will wait for them to terminate before it exits. For real code though you might well want to wait. – Duncan Sep 11 '19 at 14:43
  • There is no way this works. We only want to wait till the fist one is done – Charlie OConor Oct 06 '20 at 03:05
  • 1
    @CharlieOConor if you only want to wait for the first result then call `q.get()` once and use that result. – Duncan Oct 07 '20 at 08:41
7

If you have some sort of processing loop in your threads, the following code will terminate them when one terminates by using a threading.Event():

def my_thread(stop_event):
    while not stop_event.is_set():
        # do stuff in a loop

        # some check if stuff is complete
        if stuff_complete:
            stop_event.set()
            break

def run_threads():
    # create a thread event
    a_stop_event = threading.Event()

    # spawn the threads
    for x in range(5):
        t = threading.Thread(target=my_thread, args=[a_stop_event])
        t.start()

    while not a_stop_event.is_set():
        # wait for an event
        time.sleep(0.1)

    print "At least one thread is done"

If your process is "cheap" or a single request-response type thread (i.e. for instance an async HTTP request) then Duncan's answer is a good approach.

Community
  • 1
  • 1
will-hart
  • 3,742
  • 2
  • 38
  • 48
2

You can use an event for this. See http://docs.python.org/2/library/threading.html#event-objects The idea is that the worker threads raise an event when they are finished. The main thread waits for this event before continueing. The worker thread can set a (mutexed) variable to identify itself with the event.

Ludo
  • 813
  • 1
  • 9
  • 21
2

Duncan's method is probably the best and is what I would recommend. I've been mildly annoyed by the lack of "wait for next completed thread to complete" before, though, so I just wrote this up to try it out. Seems to work. Simply use MWThread in place of threading.thread and you get this new wait_for_thread function.

The global variables are a bit klunky; an alternative would be to make them class-level variables. But if this is hidden in a module (mwthread.py or whatever) it should be fine either way.

#! /usr/bin/env python

# Example of how to "wait for" / join whichever threads is/are done,
# in (more or less) the order they're done.

import threading
from collections import deque

_monitored_threads = []
_exited_threads = deque()
_lock = threading.Lock()
_cond = threading.Condition(_lock)

class MWThread(threading.Thread):
    """
    multi-wait-able thread, or monitored-wait-able thread
    """
    def run(self):
        tid = threading.current_thread()
        try:
            with _lock:
                _monitored_threads.append(tid)
            super(MWThread, self).run()
        finally:
            with _lock:
                _monitored_threads.remove(tid)
                _exited_threads.append(tid)
                _cond.notifyAll()

def wait_for_thread(timeout=None):
    """
    Wait for some thread(s) to have finished, with optional
    timeout.  Return the first finished thread instance (which
    is removed from the finished-threads queue).

    If there are no unfinished threads this returns None
    without waiting.
    """
    with _cond:
        if not _exited_threads and _monitored_threads:
            _cond.wait(timeout)
        if _exited_threads:
            result = _exited_threads.popleft()
        else:
            result = None
    return result

def main():
    print 'testing this stuff'
    def func(i):
        import time, random
        sleeptime = (random.random() * 2) + 1
        print 'thread', i, 'starting - sleep for', sleeptime
        time.sleep(sleeptime)
        print 'thread', i, 'finished'

    threads = [MWThread(target=func, args=(i,)) for i in range(3)]
    for th in threads:
        th.start()
    i = 0
    while i < 3:
        print 'main: wait up to .5 sec'
        th = wait_for_thread(.5)
        if th:
            print 'main: got', th
            th.join()
            i += 1
        else:
            print 'main: timeout'
    print 'I think I collected them all'
    print 'result of wait_for_thread():'
    print wait_for_thread()

if __name__ == '__main__':
    main()
torek
  • 448,244
  • 59
  • 642
  • 775
1

Or just keep track of all finished threads in a list and let the second thread to finish handle whatever is supposed to be done, Python lists are threadsafe.

finished_threads = []
event = threading.Event()

def func():
   do_important_stuff()

   thisthread = threading.current_thread()
   finished_threads.append(thisthread)
   if len(finished_threads) > 1 and finished_threads[1] == thisthread:
       #yay we are number two!
       event.set()

for i in range(5):
    threading.Thread(target=func).start()

event.wait()
Fredrik Håård
  • 2,856
  • 1
  • 24
  • 32
  • This doesn't answer the bit about the main thread waiting until two threads are complete and then continuing: instead you've transferred all the remaining activity onto the second thread to complete which may not be what is wanted. – Duncan Jul 10 '13 at 08:17
  • 1
    true; handle_two_threads_done() should probably set on an Event instead. Edited. – Fredrik Håård Jul 10 '13 at 08:27
  • Ummm, Python lists are threadsafe? Really? I thought one needed to use a Queue() for thread coherency! – Jim Dennis Jul 10 '13 at 08:37
  • The lists are safe, their data is not (like other Python builtins). In this case, they are safe because there is no race condition. If you need to synchronize on a list, you need a Queue. I try to avoid having to synchronize though. – Fredrik Håård Jul 10 '13 at 08:45
0

I think if you want to wait you should use the time library. console:

pip install time

code:

import time
time.Sleep(5)#seconds