11

I am trying to read and process 1000s of files, but unfortunately it takes about 3x as long to process the file as it does to read it in from disk, so I would like to process these files as they are read in (and while I am continuing to read in additional files).

In a perfect world, I have a generator which reads one file at a time, and I would like to pass this generator to a pool of workers which process items from the generator as they are (slowly) generated.

Here's an example:

def process_file(file_string):
     ...
     return processed_file

 pool = Pool(processes=4)
 path = 'some/path/'
 results = pool.map(process_file, (open(path+part,'rb').read() for part in os.listdir(path)))

the only issue with the code above is that all the files are read into memory before the pool begins, which means that I need to wait for the disk to read everything in, and I also consume a large amount of memory.

mgoldwasser
  • 14,558
  • 15
  • 79
  • 103
  • Try specifying a `chunksize` argument to the `pool.map()` call to control how many elements of the iterable it submits to the `Pool` as separate tasks at a time. – martineau Dec 07 '15 at 21:30

2 Answers2

10

Pool.map and Pool.map_async listify the iterable passed to them, so your generator will always be realized fully before processing even begins.

The various Pool.imap* functions appear to process inputs as generators, so you might be able to change:

results = pool.map(process_file, (open(path+part,'rb').read() for part in os.listdir(path)))

to:

# If you can process outputs one at a time, drop the list wrapper
# If you can process outputs without order mattering, imap_unordered will
# get you the best results
results = list(pool.imap(process_file, (open(path+part,'rb').read() for part in os.listdir(path))))

and get the same results without slurping before processing, but AFAICT, they'll still try to fully populate the queues as fast as they can, which could lead to a lot of data outstanding and excessive memory usage; beyond that, you'll be reading all the data in one process, then sending all of it over IPC, which means you're still mostly bottlenecked on I/O.

In your position, I'd move the read into the task itself (and if you can, avoid reading in the whole file, processing it by line or by block instead of reading the whole thing at once). You'd get parallel reads, less IPC, and you won't risk slurping all the files before the first few are even processed; you'll never have more files open than you have workers. So the end result would look like:

def process_file(path):
     with open(path, 'rb') as f:
         file_string = f.read()
     ... same as before ...
     return processed_file

pool = Pool(processes=4)
path = 'some/path/'
results = pool.imap(process_file, (os.path.join(path, part) for part in os.listdir(path)))
ShadowRanger
  • 143,180
  • 12
  • 188
  • 271
  • If multiple children are trying to read files, I believe this will cause interleaved disk reads and disk thrashing? – mgoldwasser Dec 07 '15 at 23:06
  • @mgoldwasser: Depends on the storage medium; on NFS, you're often limited by latency or per connection bandwidth more than simultaneous reads for instance. Testing would be needed to determine whether it's a performance problem. If reading blocks or lines instead of slurping the whole file (as I suggested in the answer, though I lacked the information to give a useful example), the work done between block reads might reduce conflicts. Alternatively, when slurping, a single `multiprocessing.Lock` could be used to limit reads to a single worker at a time. Best option varies by circumstances. – ShadowRanger Dec 07 '15 at 23:19
  • mgoldwasser, you can measure it ;) Just compare two possible solutions: mine with yield and @ShadowRanger's with multiprocessing reading. Because it depends on hardware/software/network... – Jimilian Dec 08 '15 at 09:54
  • @ShadowRanger it turns out that moving the file reading into the process_file function had no effect on the time it takes to run. Using `imap()` was the real key to speed - thanks! – mgoldwasser Dec 08 '15 at 21:22
2

You are reading the files into the parent's memory and then transferring the payload into the children. That's rather inefficient. Send just the filename and let the children do the I/O. If the result is a bunch of text that you plan to write to a file, do that in the child also.

map will normally issue large blocks of work in one shot to reduce communication overhead with its pool workers. That's probably why you get the big memory spike. Passing just the filename solves that problem but setting a small chunksize is still beneficial when you have uneven processing time among the workers.

def process_file(filename):
     with open(filename, 'rb') as fp:
         file_string = fp.read()
     ...
     return processed_file

 pool = Pool(processes=4)
 path = 'some/path/'
 results = pool.map(process_file, path+part for part in os.listdir(path)), chunksize=1)
tdelaney
  • 73,364
  • 6
  • 83
  • 116
  • If multiple children are trying to read files, I believe this will cause interleaved disk reads and disk thrashing? – mgoldwasser Dec 07 '15 at 23:04
  • @mgoldwasser - It can, but you want to keep your disk channel going by have a few workers fetching data while others are busy processing. And there is some advantage to queue reordering as heads fly past data. You won't have significant thrashing with 4 workers reading. – tdelaney Dec 07 '15 at 23:39
  • this method does work, and it cuts down the time considerably. It turns out that using `imap()` instead of `map()` gives you the biggest benefit. – mgoldwasser Dec 08 '15 at 21:24
  • Thanks for the update. How multiprocessing tweaks work in the real world can be surprising. – tdelaney Dec 08 '15 at 23:02