2

I have a folder in a server which will continuously receive some files throughout the day. I need to watch the directory and once a file is received need to start some processing on that file. Sometime, processing can take a bit longer based on file size which can reach upto 20 GB.

I am using concurrent.futures.ThreadPoolExecutor to process multiple files at a go. But, I need some help in understanding how to handle the below scenario :-

I have received 5 files (4 small and 1 huge file) at once, ThreadPoolExecutor picks up all the 5 files for processing. It takes few seconds to process 4 small files but it takes 20 mins to process the large file. Now, I have another 10 files waiting in the folder while the large file is being processed.

I have set max_workers=5 but only one ThreadPoolExecutor worker runs now to process the large file, which blocks the execution of next set of files. How can we start processing the other files while 4 workers are free that time.


import os
import time
import random
import concurrent.futures
import datetime
import functools

def process_file(file1, input_num):
    # Do some processing
    os.remove(os.path.join('C:\\temp\\abcd',file1))
    time.sleep(10)    

def main():
    print("Start Time is ",datetime.datetime.now())

    #It will be a continuous loop which will watch a directory for incoming file
    while True:
        #Get the list of files in directory
        file_list = os.listdir('C:\\temp\\abcd')
        print("file_list is", file_list)
        input_num = random.randint(1000000000,9999999999)

        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            process_file_arg = functools.partial(process_file, input_num = input_num)
            executor.map(process_file_arg, file_list)

        time.sleep(10)

if __name__ == '__main__':
    main()

the main() function continuously watch a directory and calls ThreadPoolExecutor

Sam
  • 21
  • 3

1 Answers1

3

I ran into the same problem, this answer may help you.

concurrent.futures.wait returns the futures into a named 2-tuple of sets, done and not_done, so we can remove done part and add new tasks into the not_done thread list to make the parallel job be continuous, here is an example snippet:

thread_list = []
with open(input_filename, 'r') as fp_in:
    with concurrent.futures.ThreadPoolExecutor(max_workers=THREAD_LIMIT) as executor:
        for para_list in fp_in:
            thread_list.append(executor.submit(your_thread_func, para_list))
            if len(thread_list) >= THREAD_LIMIT:
                done, not_done = concurrent.futures.wait(thread_list, timeout=1,
                                                     return_when=concurrent.futures.FIRST_COMPLETED)
                # consume finished
                done_res = [i.result() for i in done]
                # and keep unfinished
                thread_list = list(not_done)               
zhibo
  • 29
  • 5