6

I have a program which copies large numbers of files from one location to another - I'm talking 100,000+ files (I'm copying 314g in image sequences at this moment). They're both on huge, VERY fast network storage RAID'd in the extreme. I'm using shutil to copy the files over sequentially and it is taking some time, so I'm trying to find the best way to opimize this. I've noticed some software I use effectively multi-threads reading files off of the network with huge gains in load times so I'd like to try doing this in python.

I have no experience with programming multithreading/multiprocessesing - does this seem like the right area to proceed? If so what's the best way to do this? I've looked around a few other SO posts regarding threading file copying in python and they all seemed to say that you get no speed gain, but I do not think this will be the case considering my hardware. I'm nowhere near my IO cap at the moment and resources are sitting around 1% (I have 40 cores and 64g of RAM locally).

EDIT

Been getting some up-votes on this question (now a few years old) so I thought I'd point out one more thing to speed up file copies. In addition to the fact that you can easily 8x-10x copy speeds using some of the answers below (seriously!) I have also since found that shutil.copy2 is excruciatingly slow for no good reason. Yes, even in python 3+. It is beyond the scope of this question so I won't dive into it here (it's also highly OS and hardware/network dependent), beyond just mentioning that by tweaking the copy buffer size in the copy2 function you can increase copy speeds by yet another factor of 10! (however note that you will start running into bandwidth limits and the gains are not linear when multi-threading AND tweaking buffer sizes. At some point it does flat line).

Spencer
  • 1,931
  • 1
  • 21
  • 44
  • have you looked at `threading.Thread`? https://docs.python.org/2/library/threading.html You can create multiple threads, start and join them, I'm not sure if that's going to help, but it's the only thing that I can think of. – Juan Avalos Jun 02 '17 at 03:44
  • Hey Juan, I can certainly dive into this. I guess my question was more if it is worth teaching myself how to do this when in the end it might not even be faster. In other words, does anyone have experience speeding up copy times with threading using that? – Spencer Jun 02 '17 at 03:47
  • Hm, based on this [link](https://stackoverflow.com/questions/3044580/multiprocessing-vs-threading-python) I think multiprocessing would be better than threading because "Processes have independent I/O scheduling." – Spencer Jun 02 '17 at 03:52
  • https://repl.it/I2hT/0 you can try something like that, it's not too complicated, I have never used multi-processing, just multi-threading. hope it helps. – Juan Avalos Jun 02 '17 at 04:02
  • Kudos if you want to play with python, but you might get there quicker with 40 shells that run 40 copy commands. eg "cp A*.*" and "cp B*.*" etc. Or a bash script to do the same – John Mee Jun 02 '17 at 05:11
  • 1
    @JohnMee Got it working in the end! The sweet spot for me was about 16 cores. I actually saw a speed decrease after 20. https://stackoverflow.com/questions/8584797/multithreaded-file-copy-is-far-slower-than-a-single-thread-on-a-multicore-cpu/45526392#45526392 – Spencer Aug 07 '17 at 19:14
  • @Spencer ping me if you ever do the gevent version; i'd be curious to hear the result. – John Mee Aug 09 '17 at 00:05

5 Answers5

8

UPDATE:

I never did get Gevent working (first answer) because I couldn't install the module without an internet connection, which I don't have on my workstation. However I was able to decrease file copy times by 8 just using the built in threading with python (which I have since learned how to use) and I wanted to post it up as an additional answer for anyone interested! Here's my code below, and it is probably important to note that my 8x copy time will most likely differ from environment to environment due to your hardware/network set-up.

import Queue, threading, os, time
import shutil

fileQueue = Queue.Queue()
destPath = 'path/to/cop'

class ThreadedCopy:
    totalFiles = 0
    copyCount = 0
    lock = threading.Lock()

    def __init__(self):
        with open("filelist.txt", "r") as txt: #txt with a file per line
            fileList = txt.read().splitlines()

        if not os.path.exists(destPath):
            os.mkdir(destPath)

        self.totalFiles = len(fileList)

        print str(self.totalFiles) + " files to copy."
        self.threadWorkerCopy(fileList)


    def CopyWorker(self):
        while True:
            fileName = fileQueue.get()
            shutil.copy(fileName, destPath)
            fileQueue.task_done()
            with self.lock:
                self.copyCount += 1
                percent = (self.copyCount * 100) / self.totalFiles
                print str(percent) + " percent copied."

    def threadWorkerCopy(self, fileNameList):
        for i in range(16):
            t = threading.Thread(target=self.CopyWorker)
            t.daemon = True
            t.start()
        for fileName in fileNameList:
            fileQueue.put(fileName)
        fileQueue.join()

ThreadedCopy()
Spencer
  • 1,931
  • 1
  • 21
  • 44
  • 1
    This solution works for 1 files. How would I be able to loop though multiple file with path in it? I tried looping through this code but at some point, I will get an error " Can't start new thread" – Sam Aug 02 '18 at 19:21
  • 1
    Hey Sam, you're right that there is a missing prior step here. Notice there is a Queue object called "fileQueue", this needs to be populated with tuples containing the source and destination files prior to running the threads. Something like fileQueue.put(("path/to/source/file.txt", "path/to/dest/file.txt")). Check out documentation on [Queue](https://docs.python.org/2/library/queue.html) (it's pretty simple). – Spencer Aug 03 '18 at 18:26
  • 1
    Thank you so much Spencer for taking out time to respond to my comment. – Sam Aug 06 '18 at 13:27
  • @Spencer : I get the error : OSError: [Errno 24] Too many open files, do you have an idea why? – Varlor Nov 16 '18 at 15:40
  • And did the 16 means 16 cores? – Varlor Nov 16 '18 at 16:34
  • @Varlor Yes, but I'd lower that number. Test your system, it will definitely vary across hardware setups. And I'm not sure on that error, I'd have to see some code. – Spencer Nov 17 '18 at 01:57
  • @Spencer ok I think the problem is that i try a multiprocessing way and it starts to many processes. But I still have a problem with your implementation. After the copying progress is finished the 16 or 32 daemonic threads are still alive. And starting the script over and over again I have more and more open daemonic threads. – Varlor Nov 19 '18 at 10:33
  • @Varlor Why are they still alive? It sounds like your copy logic is wrong somehow. Daemonic just means that they will shut down abruptly when your script ends (hence joining the queue). Don't remember now why I made them daemonic but I think you can remove that... Maybe post up a question and I'll see if I can poke around in it? – Spencer Nov 20 '18 at 03:49
  • @Varlor and sorry if I can't be more helpful at the moment, I just haven't used the threading module in about a year - I switched to PyQt threading since I posted this and somehow forgot just about everything I thought I knew... – Spencer Nov 20 '18 at 03:51
  • @Spencer I was using the copyfile method from here : https://stackoverflow.com/questions/22078621/python-how-to-copy-files-fast . And your threading code of this post. The only thing i changed was to create the Queue.queue inside the class – Varlor Nov 20 '18 at 08:46
5

How about using a ThreadPool?

import os
import glob
import shutil
from functools import partial
from multiprocessing.pool import ThreadPool

DST_DIR = '../path/to/new/dir'
SRC_DIR = '../path/to/files/to/copy'

# copy_to_mydir will copy any file you give it to DST_DIR
copy_to_mydir = partial(shutil.copy, dst=DST_DIR)

# list of files we want to copy
to_copy = glob.glob(os.path.join(SRC_DIR, '*'))

with ThreadPool(4) as p:
  p.map(copy_to_mydir, to_copy)
Community
  • 1
  • 1
David Diaz
  • 437
  • 4
  • 11
3

This can be parallelized by using gevent in Python.

I would recommend the following logic to achieve speeding up 100k+ file copying:

  1. Put names of all the 100K+ files, which need to be copied in a csv file, for eg: 'input.csv'.

  2. Then create chunks from that csv file. The number of chunks should be decided based on no.of processors/cores in your machine.

  3. Pass each of those chunks to separate threads.

  4. Each thread sequentially reads filename in that chunk and copies it from one location to another.

Here goes the python code snippet:

import sys
import os
import multiprocessing

from gevent import monkey
monkey.patch_all()

from gevent.pool import Pool

def _copyFile(file):
    # over here, you can put your own logic of copying a file from source to destination

def _worker(csv_file, chunk):
    f = open(csv_file)
    f.seek(chunk[0])
    for file in f.read(chunk[1]).splitlines():
        _copyFile(file)


def _getChunks(file, size):
    f = open(file)
    while 1:
        start = f.tell()
        f.seek(size, 1)
        s = f.readline()
        yield start, f.tell() - start
        if not s:
            f.close()
            break

if __name__ == "__main__":
    if(len(sys.argv) > 1):
        csv_file_name = sys.argv[1]
    else:
        print "Please provide a csv file as an argument."
        sys.exit()

    no_of_procs = multiprocessing.cpu_count() * 4

    file_size = os.stat(csv_file_name).st_size

    file_size_per_chunk = file_size/no_of_procs

    pool = Pool(no_of_procs)

    for chunk in _getChunks(csv_file_name, file_size_per_chunk):
        pool.apply_async(_worker, (csv_file_name, chunk))

    pool.join()

Save the file as file_copier.py. Open terminal and run:

$ ./file_copier.py input.csv
Rajanya Dhar
  • 166
  • 6
  • Thank you for the detailed response! I'll have to take some time to go through this and make sure I understand everything. Before I have a chance to do that though, I forgot to mention that sometimes I only have a few files to process - so it will vary from a couple of files to 100k+. Will I see a significant speed decrease by doing this? I suppose I could always set a threshold such that if it is more than n files, then multithread it. – Spencer Jun 02 '17 at 04:51
  • You are welcome! :) I hope my solution will be helpful to you. Yes, you will see a significant speed decrease compared to sequential processing. I feel setting up a threshold for varying no.of files can be a good idea. With respect to the code snippet, you can set n = no_of_cores * 4. – Rajanya Dhar Jun 02 '17 at 05:49
1

If you just want to copy a directory tree from one path to another, here's my solution that's a litte more simple than the previous solutions. It leverages multiprocessing.pool.ThreadPool and uses a custom copy function for shutil.copytree:

import shutil
from multiprocessing.pool import ThreadPool


class MultithreadedCopier:
    def __init__(self, max_threads):
        self.pool = ThreadPool(max_threads)

    def copy(self, source, dest):
        self.pool.apply_async(shutil.copy2, args=(source, dest))

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.pool.close()
        self.pool.join()


src_dir = "/path/to/src/dir"
dest_dir = "/path/to/dest/dir"


with MultithreadedCopier(max_threads=16) as copier:
    shutil.copytree(src_dir, dest_dir, copy_function=copier.copy)
Joe Savage
  • 873
  • 9
  • 8
0

While re-implementing the code posted by @Spencer, I ran into the same error as mentioned in the comments below the post (to be more specific: OSError: [Errno 24] Too many open files). I solved this issue by moving away from the daemonic threads and using concurrent.futures.ThreadPoolExecutor instead. This seems to handle in a better way the opening and closing of the files to copy. By doing so all the code stayed the same besides the threadWorkerCopy(self, filename_list: List[str]) method which looks like this now:

    def threadWorkerCopy(self, filename_list: List[str]):
    """
    This function initializes the workers to enable the multi-threaded process. The workers are handles automatically with
    ThreadPoolExecutor. More infos about multi-threading can be found here: https://realpython.com/intro-to-python-threading/.
    A recurrent problem with the threading here was "OSError: [Errno 24] Too many open files". This was coming from the fact
    that deamon threads were not killed before the end of the script. Therefore, everything opened by them was never closed.

    Args:
        filename_list (List[str]): List containing the name of the files to copy.
    """
    with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor:
        executor.submit(self.CopyWorker)

        for filename in filename_list:
            self.file_queue.put(filename)
        self.file_queue.join()  # program waits for this process to be done.
DCleres
  • 1
  • 3