2

I need to read every file in the directory tree starting from a given root location. I would like to do this as fast as possible using parallelism. I have 48 cores at my disposal and 1 TB ram, so the thread resources are not an issue. I also need to log every file that was read.

I looked at using joblib but am unable to combine joblib with os.walk.

I can think of two ways:

  • walk the tree and add all files to a queue or list and have a worker pool of threads dequeue files - best load balancing, maybe more time due to initial walk & queue overhead
  • spawn threads and statically assign portions of the tree to each thread - low load balancing, no initial walk, assign directories based on a hash of some sort.

or is there a better way?

EDIT performance of storage is not a concern. assume there is an infinitely fast storage that can handle infinite number of parallel reads

EDIT removed multinode situation to keep the focus on parallel directory walk

powerrox
  • 1,334
  • 11
  • 21
  • What kind of storage are you trying to read from? You mention scaling across nodes, so I assume we're not just talking about a typical HDD/SSD here? – dano Apr 13 '15 at 20:40
  • 2
    Building on dano's comment, this sounds more like a HDD limited operation than a CPU limited operation. If you make many threads, you will honestly probably run *SLOWER* than one CPU since you're going to force many seeks!! – WakkaDojo Apr 13 '15 at 20:43
  • Right - the bottleneck here is most likely the disk I/O. It doesn't matter how many CPUs you throw at the problem if your disk doesn't support parallel reads. – dano Apr 13 '15 at 20:57
  • @WakkaDojo edited the question. the crux of the issue is not the performance or availability of the file system across nodes. It is to walk the tree and read files in parallel without duplication. – powerrox Apr 14 '15 at 01:33
  • @dano please see my edit or comment – powerrox Apr 14 '15 at 01:33
  • 1
    I don't think it's possible to give a single answer to this question without doing a bunch of profiling of the specific cluster architecture to know what the slowdowns will be in your situation. For instance, how much time does `os.walk` take to inspect the directory hierarchy, relative to the amount of time it takes to process all the files? How fast is the communication between processes on different nodes, relative to the file-IO speed from each node? Investigate the answers, and probably the best way to solve the overall problem will become obvious. – Blckknght Apr 14 '15 at 01:40
  • @Blckknght I am not concerned about performance. Maybe focus on single node solution if multi-node is overly complex. – powerrox Apr 14 '15 at 17:07

1 Answers1

9

The simplest approach is probably to use a multiprocessing.Pool to process the results output of an os.walk performed in the main process.

This assumes that the main work you want to parallelize is whatever processing takes place on the individual files, not the effort of recursively scanning the directory structure. This may not be true if your files are small and you don't need to do much processing on their contents. I'm also assuming that the process creation handled for you by multiprocessing will be able to properly distribute the load over your cluster (which may or may not be true).

import itertools
import multiprocessing

def worker(filename):
    pass   # do something here!

def main():
    with multiprocessing.Pool(48) as Pool: # pool of 48 processes

        walk = os.walk("some/path")
        fn_gen = itertools.chain.from_iterable((os.path.join(root, file)
                                                for file in files)
                                               for root, dirs, files in walk)

        results_of_work = pool.map(worker, fn_gen) # this does the parallel processing

It is entirely possible that parallelizing the work this way will be slower than just doing the work in a single process. This is because IO on the hard disks underlying your shared filesystem may be the bottleneck and attempting many disk reads in parallel could make them all slower, if the disks needs to seek more often rather than reading longer linear sections of data. Even if the IO is a little faster, the overhead of communicating between the processes could eat up all of the gains.

Blckknght
  • 100,903
  • 11
  • 120
  • 169
  • 1
    very cool! some changes are required though for this to work: 1) os.path.walk should be changed to os.walk 2) the usage " with multiprocessing.Pool(48) as Pool" will result in AttributeError: 'Pool' object has no attribute '__exit__'. I used the contextlib solution from here: http://stackoverflow.com/questions/27065237/attributeerror-pool-object-has-no-attribute-exit – powerrox Apr 15 '15 at 02:33
  • the generators returned in fn_gen cannot be serialized, so the code still doesnt work after changes. – powerrox Apr 15 '15 at 02:39
  • this worked: results_of_work = pool.map(worker, [list(j) for j in fn_gen]) – powerrox Apr 15 '15 at 03:39