3

am making a function to copy file from local machine to remote creating thread to do sftp in parallel

def copyToServer():
    //does  copy file given host name and credentials

for i in hostsList:
    hostname = i
    username = defaultLogin
    password = defaultPassword
    thread = threading.Thread(target=copyToServer, args=(hostname, username, password, destPath, localPath))
    threadsArray.append(thread)
    thread.start()

this creates thread and does start copying in parallel but i want to limit it to process like 50 threads at a time as total number of servers could be too many

Niati Arora
  • 129
  • 2
  • 3
  • 7
  • Does this answer your question? [How to limit number of concurrent threads in Python?](https://stackoverflow.com/questions/18347228/how-to-limit-number-of-concurrent-threads-in-python) – feetwet Aug 29 '23 at 01:37

3 Answers3

14

You need to adjust your code to share and keep track of a common value.

This could be done with a Semaphore Object. The object holds an internal counter and every thread try to acquire it. If the counter is bigger than your defined maximum, the thread can't acquire one and will be blocked until one gets free.

A short example shows for a maximum of 5 threads in parallel, that one half of the threads are executed instantly and the others are blocked and wait:

import threading
import time

maxthreads = 5
sema = threading.Semaphore(value=maxthreads)
threads = list()

def task(i):
    sema.acquire()
    print "start %s" % (i,)
    time.sleep(2)
    sema.release()

for i in range(10):
    thread = threading.Thread(target=task,args=(str(i)))
    threads.append(thread)
    thread.start()

The output

start 0
start 1
start 2
start 3
start 4

and after some seconds the first threads are finished the next threads are executed

start 5
start 6
start 7
start 8
start 9
admirableadmin
  • 2,669
  • 1
  • 24
  • 41
  • This should be the accepted answer. Works great. BTW do you have any method on how to add priority to each task using this solution? Like I have some tasks that should be able to acquire with priority than others. – Z T Nov 20 '17 at 20:46
  • 1
    there is no sense to have threads.append(thread). Also this solution will not work with `daemon` threads – Beliaev Maksim Feb 05 '21 at 11:15
2

To those looking to 'quickfix' solution for limiting number of threads in 'threading' module in python3 - the basic logic is to wrap the main function into a wrapper and call on a wrapper containing the stop/go logic.

This below reuses solution proposed by Andpei, however the verbatim code from his post did not work, my modification which worked for me is below.

Python3:

import threading
import time

maxthreads = 3
smphr = threading.Semaphore(value=maxthreads)
threads = list()

SomeInputCollection=("SomeInput1","SomeInput2","SomeInput3","SomeInput4","SomeInput5","SomeInput6")

def yourmainfunction(SomeInput):
    #main function
    print ("Your input was: "+ SomeInput)

def task(SomeInput):
    #yourmainfunction wrapped in a task
    print(threading.currentThread().getName(), 'Starting')
    smphr.acquire()
    yourmainfunction(SomeInput)
    time.sleep(2)
    print(threading.currentThread().getName(), 'Exiting')
    smphr.release()


def main():
    threads = [threading.Thread(name="worker/task", target=task, args=(SomeInput,)) for SomeInput in SomeInputCollection]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
if __name__== "__main__":
  main()

Output:

worker/task Starting
Your input was: SomeInput1
worker/task Starting
Your input was: SomeInput2
worker/task Starting
Your input was: SomeInput3
worker/task Starting
worker/task Starting
worker/task Starting
worker/task Exiting
Your input was: SomeInput4
worker/task Exiting
worker/task Exiting
Your input was: SomeInput6
Your input was: SomeInput5
worker/task Exiting
worker/task Exiting
worker/task Exiting
J.Paul
  • 1,393
  • 1
  • 7
  • 6
0
#!/usr/bin/python
# -*- coding: utf-8 -*-
import time
from threading import Lock, Thread, active_count
from random import uniform # get some random time

thread_list = []
names = ['Alfa', ' Bravo', ' Charlie', ' Delta', ' Echo', ' Foxtrot', ' Golf', ' Hotel', ' India', ' Juliett', ' Kilo', ' Lima']
#-------------------------------------------------------------------------

def testFunction(inputName):
    waitTime = uniform(0.987, 2.345) # Random time between 0.987 and 2.345 seconds
    time.sleep(waitTime)
    print ('Finished working on name: ' + inputName)
#-------------------------------------------------------------------------

n_threads = 4 # define max child threads. 
for list_names in names:

    print ( 'Launching thread with name: ' + list_names )
    t = Thread(target=testFunction, args=(list_names,))
    thread_list.append(t)
    t.start()

    while active_count() > n_threads: # max thread count (includes parent thread)
        print ( '\n == Current active threads ==: ' + str(active_count()-1) )
        time.sleep(1) # block until active threads are less than 4

for ex in thread_list: # wait for all threads to finish
    ex.join()
#-------------------------------------------------------------------------
print ( '\n At this point we continue on main thread \n' )

This should give you something like this

# time ./threads.py
Launching thread with name: Alfa
Launching thread with name:  Bravo
Launching thread with name:  Charlie
Launching thread with name:  Delta

== Current active threads ==: 4

== Current active threads ==: 4
Finished working on name:  Bravo
Finished working on name:  Delta
Finished working on name: Alfa
Finished working on name:  Charlie
Launching thread with name:  Echo
Launching thread with name:  Foxtrot
Launching thread with name:  Golf
Launching thread with name:  Hotel

== Current active threads ==: 4

== Current active threads ==: 4
Finished working on name:  Hotel
Finished working on name:  Foxtrot
Launching thread with name:  India
Launching thread with name:  Juliett

== Current active threads ==: 4
Finished working on name:  Echo
Finished working on name:  Golf
Launching thread with name:  Kilo
Launching thread with name:  Lima

== Current active threads ==: 4
Finished working on name:  India
Finished working on name:  Juliett
Finished working on name:  Lima
Finished working on name:  Kilo

At this point we continue on main thread


real    0m6.945s
user    0m0.034s
sys     0m0.009s
mjoao
  • 197
  • 1
  • 5