-1

I want five functions to run concurrently using python threading. Each of these functions returns a value. I want to get the return values of these functions once they complete their work. I implemented the below code that waits for threads to join sequentially. In other words, when the code is waiting for thread1 to join, thread2 might complete its work and return a value. But, I want first to get the first thread return value that completed its work and second get the second thread that completed its work, and so on.

threads = []
for user in users:
    task = threading.Thread(target=monitor_wrapper, args=(user[0], user[1]))
    threads.append(task)
    task.start()
    time.sleep(delay / len(users))
for thread in threads:
    result = thread.join()
    print(result)

I read something about Queue regarding this matter.

MatinMoezi
  • 53
  • 1
  • 7
  • You could use [`multiprocessing.pool.ThreadPool`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.ThreadPool) with [`Pool.map`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.map) – Peter Wood Aug 31 '21 at 21:31

3 Answers3

1

Indeed, you can make the threads put their results into a queue (queue.put) while the main thread waits for results from the queue (queue.get). That's a thread-safe class by the way.

Official documentation: https://docs.python.org/3/library/queue.html

bwdm
  • 793
  • 7
  • 17
1

You could use multiprocessing.pool.ThreadPool with its starmap function:

from multiprocessing.pool import ThreadPool

with ThreadPool(len(users)) as pool:
    results = pool.starmap(monitor_wrapper, users)
Peter Wood
  • 23,859
  • 5
  • 60
  • 99
  • It blocks until all the threads were complete. But, I want to get the return value of the first thread at the moment it was finished. After that, for the second thread that was completed and so on. I think `Queue` does this. Thanks. – MatinMoezi Aug 31 '21 at 22:38
  • @MatinMoezi: I was just about to make the same comment but you beat me to it. It's not necessary to use a `Queue` to get the completion time if you use a method other that `starmap` — see [my answer](https://stackoverflow.com/a/69006165/355230) for details. – martineau Aug 31 '21 at 23:29
  • @martineau The threads finishing time doesn't matter to me. I want to get the return value once a thread ends. In other words, I don't want to wait that all threads end to get their return values. – MatinMoezi Aug 31 '21 at 23:42
  • @MatinMoezi: The completion time *would* matter if you wanted to use it to sort the asynchrous results — again not knowing you wanted to terminate all threads early because you left important information out of your question. In fact, it sounds like the opposite of that. – martineau Aug 31 '21 at 23:45
  • 1
    Sorry, I misunderstood the question as it was written, and the question didn't say this other requirement. I'll delete my answer. Bit of a waste of people's time. – Peter Wood Sep 01 '21 at 06:56
  • Peter: I only found out about it when the OP made [this comment](https://stackoverflow.com/questions/69005467/how-to-get-return-values-of-multi-threads-once-one-of-them-is-finished/69005779#comment121958936_69006165). They still haven't added this very important information to their question (other than changing the title slightly). – martineau Sep 02 '21 at 02:30
0

I believe this shows how to do what you want. It uses the appy_async() method that the multiprocessing.pool.ThreadPool class has so it can specify a callback function which provides the opportunity for it to add a finishing time to the results as well as capture what they are. In the sample code the results themselves include the arguments as well as a random result value to make it possible to identify the source of each fake result.

from multiprocessing.pool import ThreadPool
import random
import threading
import time

def monitor_wrapper(*args):
    time.sleep(random.uniform(1.0, 2.0))
    return args + (random.randint(1, 100),)

if __name__ == '__main__':

    results = []
    users = [('a', 'b'), ('c', 'd'), ('e', 'f'), ('g', 'h')]

    def callback(result):
        results.append((result, time.time()))  # Add finish time to result.

    with ThreadPool(processes=3) as pool:
        for user in users:
            pool.apply_async(monitor_wrapper, user, callback=callback)
        pool.close()  # All tasks submitted.
        pool.join()  # Wait for the worker processes to exit.

    results.sort(key=lambda t: t[1])  # Sort results by finish time.

    print('Results in order of finishing time:')
    for result in results:
        print(f'  users: {result[0][:2]}, returned value: {result[0][2]}, '
              f' finishing time: {result[1]}')

Here's some sample output:

Results in order of finishing time:
  users: ('e', 'f'), returned value: 29,  finishing time: 1630452226.8134956
  users: ('c', 'd'), returned value: 67,  finishing time: 1630452226.9075007
  users: ('a', 'b'), returned value: 47,  finishing time: 1630452227.0395086
  users: ('g', 'h'), returned value: 97,  finishing time: 1630452228.1155698
martineau
  • 119,623
  • 25
  • 170
  • 301
  • Thank you for your response. I have implemented what I wanted with `Queue`, and it works well. When a thread ends, it put its return value in a queue and the code waits until a value is put in the queue and then gets it. In this manner, I can terminate all other threads that are still running once I reach the desired return value. But, I think in the callback approach that you mentioned, it is impossible to terminate other threads once the desired value is reached in the callback function. Could you please explain the differences between `Queue` and your approach? – MatinMoezi Aug 31 '21 at 23:39
  • 1
    This approach would not be compatible with wanting to terminate the other threads before they have finished — which is something you really should have mentioned in your question. – martineau Aug 31 '21 at 23:43
  • You can't really terminate a thread. See this [comment](https://stackoverflow.com/questions/65044378/how-to-kill-a-thread-after-n-seconds/65054276#comment114992849_65044378) and my [answer](https://stackoverflow.com/a/65054276/355230) to the question. – martineau Sep 02 '21 at 02:26