191

This may have been asked in a similar context but I was unable to find an answer after about 20 minutes of searching, so I will ask.

I have written a Python script (lets say: scriptA.py) and a script (lets say scriptB.py)

In scriptB I want to call scriptA multiple times with different arguments, each time takes about an hour to run, (its a huge script, does lots of stuff.. don't worry about it) and I want to be able to run the scriptA with all the different arguments simultaneously, but I need to wait till ALL of them are done before continuing; my code:

import subprocess

#setup
do_setup()

#run scriptA
subprocess.call(scriptA + argumentsA)
subprocess.call(scriptA + argumentsB)
subprocess.call(scriptA + argumentsC)

#finish
do_finish()

I want to do run all the subprocess.call() at the same time, and then wait till they are all done, how should I do this?

I tried to use threading like the example here:

from threading import Thread
import subprocess

def call_script(args)
    subprocess.call(args)

#run scriptA   
t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))
t1.start()
t2.start()
t3.start()

But I do not think this is right.

How do I know they have all finished running before going to my do_finish()?

Inbar Rose
  • 41,843
  • 24
  • 85
  • 131

9 Answers9

255

Put the threads in a list and then use the Join method

 threads = []

 t = Thread(...)
 threads.append(t)

 ...repeat as often as necessary...

 # Start all threads
 for x in threads:
     x.start()

 # Wait for all of them to finish
 for x in threads:
     x.join()
Aaron
  • 7,015
  • 2
  • 13
  • 22
Aaron Digulla
  • 321,842
  • 108
  • 597
  • 820
  • thanks you. this is a cleaner way to do it, but i am marking negai as the correct answer. – Inbar Rose Aug 15 '12 at 12:12
  • could i technically do `threads.append(Thread(target=call_script, args=(scriptA + argumentsA)))` ? and skip a bunch of code? – Inbar Rose Aug 15 '12 at 12:14
  • 1
    Yes, that would work but is harder to understand. You should always try to find a balance between compact code and "readability". Remember: Code is written once but read many times. So it's more important that it's easy to understand. – Aaron Digulla Aug 15 '12 at 12:20
  • yes, but i dont only have 3 threads, i have like 20. so it would be 20 lines of code to set t1...t20 and then 20 lines to add them to threads... instead of just 20 lines... ? – Inbar Rose Aug 15 '12 at 12:22
  • Maybe use a factory which creates these and collects them in a list? – Aaron Digulla Aug 15 '12 at 12:28
  • a what? please explain - put it in a separate answer or modify this one ? – Inbar Rose Aug 15 '12 at 12:29
  • 2
    The "factory pattern" isn't something I can explain in one sentence. Google for it and search stackoverflow.com. There are many examples and explanations. In a nutshell: You write code which builds something complex for you. Like a real factory: You give in an order and get a finished product back. – Aaron Digulla Aug 15 '12 at 12:34
  • however interesting factory patterns are, i dont think it has anything to do with my code, or what i asked, but thank you for showing it to me anyway. – Inbar Rose Aug 15 '12 at 12:41
  • @IoanAlexandruCucu: Often, you need to do something between starting the threads and waiting for them to finish. That's why I used two comprehensions. – Aaron Digulla Jan 29 '14 at 08:11
  • 3
    @Aaron DIgull I understand that.What I mean is that I would just do a `for x in threads: x.join()` rather than using list comprehantion – Ioan Alexandru Cucu Jan 29 '14 at 09:59
  • 2
    @IoanAlexandruCucu: I'm still wondering if there is a more readable and efficient solution: http://stackoverflow.com/questions/21428602/how-inefficient-is-a-list-comprehension-if-you-dont-assign-it – Aaron Digulla Jan 29 '14 at 11:01
  • one x.join will block the next x.join in the loop? – Dee Oct 22 '21 at 08:33
206

You need to use join method of Thread object in the end of the script.

t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

Thus the main thread will wait till t1, t2 and t3 finish execution.

Maksim Skurydzin
  • 10,301
  • 8
  • 40
  • 53
  • 7
    hmmm - having trouble understanding something, wont this first run t1, wait till its finish, then go to t2..etc,etc ? how do make it all happen at once? i dont see how this would run them at the same time? – Inbar Rose Aug 15 '12 at 12:01
  • 35
    The call to `join` blocks until thread finishes execution. You will have to wait for all of the threads anyway. If `t1` finishes first you will start waiting on `t2` (which might be finished already and you will immediately proceed to wait for `t3`). If `t1` took the longest to execute, when you return from it both `t1` and `t2` will return immediately without blocking. – Maksim Skurydzin Aug 15 '12 at 12:06
  • 1
    you dont understand my question - if i copy the above code to my code - will it work? or am i missing something? – Inbar Rose Aug 15 '12 at 12:06
  • 1
    sorry, I accidentally missed the part that actually starts threads' execution (updated the answer). Yes, the code, as it is now, will work as you expect it. – Maksim Skurydzin Aug 15 '12 at 12:08
  • 2
    okay, i see. now i understand, was a bit confused about it but i think i understand, `join` sort of attaches the current process to the thread and waits till its done, and if t2 finishs before t1 then when t1 is done it will check for t2 being done see that it is, and then check t3..etc..etc.. and then only when all are done it will continue. awesome. – Inbar Rose Aug 15 '12 at 12:11
  • 4
    say t1 takes the longest, but t2 has an exception. what happens then? can you catch that exception or check whether t2 finished ok or not? – Ciprian Tomoiagă May 04 '14 at 15:27
  • i think use the `for` loop is better. but your answer is ok. – Hamidreza Khorammfar Sep 23 '21 at 01:59
  • but t2.join has to wait for t1, t3.join has to wait for t2? – Dee Oct 22 '21 at 08:33
  • 1
    @datdinhquoc you intend to wait for all the threads' completion anyway. If the thread is already completed when you call join, the call should be very fast and non-blocking. Thus the overall wait time should be dominated by the slowest thread's execution. – Maksim Skurydzin Oct 25 '21 at 14:40
65

In Python3, since Python 3.2 there is a new approach to reach the same result, that I personally prefer to the traditional thread creation/start/join, package concurrent.futures: https://docs.python.org/3/library/concurrent.futures.html

Using a ThreadPoolExecutor the code would be:

from concurrent.futures.thread import ThreadPoolExecutor
import time
    
def call_script(ordinal, arg):
    print('Thread', ordinal, 'argument:', arg)
    time.sleep(2)
    print('Thread', ordinal, 'Finished')
    
args = ['argumentsA', 'argumentsB', 'argumentsC']
    
with ThreadPoolExecutor(max_workers=2) as executor:
    ordinal = 1
    for arg in args:
        executor.submit(call_script, ordinal, arg)
        ordinal += 1
print('All tasks has been finished')

The output of the previous code is something like:

Thread 1 argument: argumentsA
Thread 2 argument: argumentsB
Thread 1 Finished
Thread 2 Finished
Thread 3 argument: argumentsC
Thread 3 Finished
All tasks has been finished

One of the advantages is that you can control the throughput setting the max concurrent workers.

To use multiprocessing instead, you can use ProcessPoolExecutor.

floer32
  • 2,190
  • 4
  • 29
  • 50
Roberto
  • 8,586
  • 3
  • 42
  • 53
  • but how can you tell when all threads in the threadpool have finished? – K-Dawg Oct 14 '17 at 16:14
  • 4
    As you can see in the example, the code after the `with` statement is executed when all task have finished. – Roberto Oct 17 '17 at 19:32
  • this doesn't work. Try doing something really long in threads. Your print statement will execute before thread finishes – Pranalee Jan 05 '19 at 07:17
  • 2
    @Pranalee, That code works, I've updated the code to add the output lines. You cannot see the "All task..." before all threads are finished, That is how the `with` statement works by design in this case. Anyway, you can always open a new question in SO and post your code so we can help you to find out what is happening in your case. – Roberto Jan 05 '19 at 11:56
  • 3
    @PrimeByDesign you can use `concurrent.futures.wait` function, you can see a [real example here](https://github.com/shaftoe/api-l3x-in/blob/b82e3a888ecff261bcae8ffd7c8a9846ba616391/lib/stacks/pagespeed/lambdas/pagespeed_poller.py#L69) Official docs: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.wait – Alexander Fortin May 21 '20 at 16:35
  • This does work, because base `Executor` explicitly calls `self.shutdown(wait=True)` in its `__exit__`. – frnhr Nov 29 '20 at 08:02
  • BE AWARE for a long args - you will end up with tons of memory usage - consider using semaphore. – Sion C May 24 '23 at 16:29
41

I prefer using list comprehension based on an input list:

inputs = [scriptA + argumentsA, scriptA + argumentsB, ...]
threads = [Thread(target=call_script, args=(i)) for i in inputs]
[t.start() for t in threads]
[t.join() for t in threads]
Adam Matan
  • 128,757
  • 147
  • 397
  • 562
  • 2
    Checked answer explains well but this one is shorter and doesn't require ugly repetitions. Just a nice answer. :) – tleb Jul 15 '16 at 17:32
  • 1
    List comprehension just for side effects is usually depreciated*. But in this use case, it seems to be a good idea. *http://stackoverflow.com/questions/5753597/is-it-pythonic-to-use-list-comprehensions-for-just-side-effects – Vinayak Kaniyarakkal May 15 '17 at 13:36
  • 6
    @VinayakKaniyarakkal `for t in threads:t.start()` isn't it better? – Smart Manoj May 23 '18 at 14:38
7

You can have class something like below from which you can add 'n' number of functions or console_scripts you want to execute in parallel passion and start the execution and wait for all jobs to complete..

from multiprocessing import Process

class ProcessParallel(object):
    """
    To Process the  functions parallely

    """    
    def __init__(self, *jobs):
        """
        """
        self.jobs = jobs
        self.processes = []

    def fork_processes(self):
        """
        Creates the process objects for given function deligates
        """
        for job in self.jobs:
            proc  = Process(target=job)
            self.processes.append(proc)

    def start_all(self):
        """
        Starts the functions process all together.
        """
        for proc in self.processes:
            proc.start()

    def join_all(self):
        """
        Waits untill all the functions executed.
        """
        for proc in self.processes:
            proc.join()


def two_sum(a=2, b=2):
    return a + b

def multiply(a=2, b=2):
    return a * b


#How to run:
if __name__ == '__main__':
    #note: two_sum, multiply can be replace with any python console scripts which
    #you wanted to run parallel..
    procs =  ProcessParallel(two_sum, multiply)
    #Add all the process in list
    procs.fork_processes()
    #starts  process execution 
    procs.start_all()
    #wait until all the process got executed
    procs.join_all()
PBD
  • 111
  • 2
  • 5
  • This is multiprocessing. Question was about https://docs.python.org/3/library/threading.html – Rustam A. Jul 10 '20 at 16:18
  • I can recomment multiprocessing too, but not this way. If you just create processes and not care about system resources it can froze system. If you don't know number of processes it is better to use .map – S__ May 31 '22 at 01:21
3

I just came across the same problem where I needed to wait for all the threads which were created using the for loop.I just tried out the following piece of code.It may not be the perfect solution but I thought it would be a simple solution to test:

for t in threading.enumerate():
    try:
        t.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err:
            continue
        else:
            raise
Omkar
  • 791
  • 1
  • 9
  • 26
3

From the threading module documentation

There is a “main thread” object; this corresponds to the initial thread of control in the Python program. It is not a daemon thread.

There is the possibility that “dummy thread objects” are created. These are thread objects corresponding to “alien threads”, which are threads of control started outside the threading module, such as directly from C code. Dummy thread objects have limited functionality; they are always considered alive and daemonic, and cannot be join()ed. They are never deleted, since it is impossible to detect the termination of alien threads.

So, to catch those two cases when you are not interested in keeping a list of the threads you create:

import threading as thrd


def alter_data(data, index):
    data[index] *= 2


data = [0, 2, 6, 20]

for i, value in enumerate(data):
    thrd.Thread(target=alter_data, args=[data, i]).start()

for thread in thrd.enumerate():
    if thread.daemon:
        continue
    try:
        thread.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err.args[0]:
            # catchs main thread
            continue
        else:
            raise

Whereupon:

>>> print(data)
[0, 4, 12, 40]
berna1111
  • 1,811
  • 1
  • 18
  • 23
2

Maybe, something like

for t in threading.enumerate():
    if t.daemon:
        t.join()
jno
  • 997
  • 1
  • 10
  • 18
  • I have tried this code but not sure about its working because the last instruction of my code was printed which was after this for loop and still the process was not terminated. – Omkar Mar 14 '18 at 13:41
0

using only join can result in false-possitive interaction with thread. Like said in docs :

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call isAlive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.

and illustrative piece of code:

threads = []
for name in some_data:
    new = threading.Thread(
        target=self.some_func,
        args=(name,)
    )
    threads.append(new)
    new.start()
    
over_threads = iter(threads)
curr_th = next(over_threads)
while True:
    curr_th.join()
    if curr_th.is_alive():
        continue
    try:
        curr_th = next(over_threads)
    except StopIteration:
        break
xillmera
  • 22
  • 3