1

I am trying to parallelize the following code to get it to run faster:

import os

fs = 0
for root, dirs, files in os.walk(os.getcwd()):
  for file in files:
    fp = os.path.join(root, file)
    fs += os.path.getsize(fp)
      
print(fs)

I tried the following:

import os
from concurrent.futures import ProcessPoolExecutor, as_completed

def main():

  total_size = 0
  with ProcessPoolExecutor() as executor:
    for root, dirs, files in os.walk(os.getcwd()):
      futures = []
      for file in files:
        file_path = os.path.join(root, file)
        futures.append(executor.submit(os.path.getsize, file_path))

      for future in as_completed(futures):
        total_size += future.result()

  print(total_size)
  
if __name__ == '__main__':
  main()

but it is far slower than my original code (iterating over files on an SSD). I'm on Windows using Python 3.8. Any idea on how to properly speed it up?

joejoejoejoe4
  • 1,206
  • 1
  • 18
  • 38

2 Answers2

0

You should batch the files, otherwise probably the overhead of the futures is larger than the save. For 1 million small files, on Linux I got with the following code a ~2x speedup than without ProcessPoolExecutor. You might need to adjust the number of files you set in a batch for your system (10k in my example).

import os
from concurrent.futures import ProcessPoolExecutor, as_completed

def getsize_list(files):
  fs = 0
  for file in files:
    fs += os.path.getsize(file)
  return fs

def main():
  total_size = 0
  with ProcessPoolExecutor() as executor:
    for root, dirs, files in os.walk(os.getcwd()):
      futures = []

      current_list = []
      for file in files:
        file_path = os.path.join(root, file)
        current_list.append(file_path)
        if len(current_list)==10_000:
          futures.append(executor.submit(getsize_list, current_list))
          current_list = []
      futures.append(executor.submit(getsize_list, current_list))

      for future in as_completed(futures):
        total_size += future.result()

  print(total_size)

if __name__ == '__main__':
  main()
vladmihaisima
  • 2,119
  • 16
  • 20
0

The second code is slower due to the inter-process communication. To understand why, we need to understand how things works in the first place.

During the first run, the OS truly use the SSD and put things in a RAM cache. Then, subsequent executions of this code are significantly faster because of that (no need to interact with the SSD device at all). On my machine the first run takes about 10 second serially while the second one takes about 5 seconds. Regarding the OS, the file system and the actual SSD driver, a system lock may be used so this operation may not be faster in parallel (this is done so to prevent any corruption and also to ease the development of some part of the system stack). Historically, some OS even used an horribly inefficient giant lock. Note that the scalability of the operation is dependent of the usage of the RAM cache (it can likely better scale with a cache). I will focus on the case where things are in the cache since it is easier to reproduce.

In the first code, most of the time is spent in os.path.getsize (about 80% on my machine -- 4 s). Then comes os.walk (about 15% -- 0.75 s). Then comes the loop overhead and the string handling (about 5% -- 0.25 s). On my machine, each call to os.path.getsize take about 50 us in average.

The thing is system calls are expensive and every call to os.path.getsize involves should typically create a kernel thread, involving the IO scheduler and the task scheduler, doing context switches and synchronizations. Not to mention the OS needs to parse, check and resolve each part of the full path so to actually get the statistical information about the target file. In the end, 50 us seems actually not so bad for this operation. On Windows, os.path.getsize actually calls several system calls: it opens the file with CreateFileW, get the size with GetFileInformationByHandle and then close it with CloseHandle. CreateFileW takes more than 70% of the function time.

In the second code, executor.submit causes data to be pickled and an inter-process communication (IPC) and data to be unpickled by the target process. IPC operations are expensive since they typically cause context switches and low-level synchronizations. Getting results back also does the same thing. In the end, most of the time should be spent in such overheads, especially since the main process is slowed down while doing such operations so workers should actually be waiting/starving most of the time. The parallel code can thus be significantly slower than the serial code. Doing that for each file is very expensive. Using a batch computation as pointed out by @vladmihaisima is a first step to reduce the context-switch overhead. Still, the overheads for pickling + unpicking + transferring data are not removed using this method though they should be smaller than the initial one. The main issue comes from using multiprocessing itself. One solution is to use threads instead of processes so to get ride of the aforementioned overheads. However, the Global Interpreter Lock (GIL) cause the string handling to be locked slowing down a bit the computation (actually more than picking on my machine).

Here is the resulting code with chunks:

import os
from concurrent.futures import ProcessPoolExecutor, as_completed

def compute_chunk_size(fileChunk):
  total_size = 0
  for block in fileChunk:
    root, files = block
    for file in files:
      file_path = os.path.join(root, file)
      total_size += os.path.getsize(file_path)
  return total_size

def main():
  total_size = 0
  with ProcessPoolExecutor() as executor:
    fileChunk = []
    fileChunkSize = 0
    futures = []
    for root, dirs, files in os.walk(os.getcwd()):
      fileChunk.append((root, files))
      fileChunkSize += len(files)
      if fileChunkSize >= 1024:
        futures.append(executor.submit(compute_chunk_size, fileChunk))
        fileChunk = []
        fileChunkSize = 0
    completed = 0
    if fileChunkSize > 0:
      futures.append(executor.submit(compute_chunk_size, fileChunk))
    while completed < len(futures):
      for future in as_completed(futures):
        total_size += future.result()
        completed += 1

  print(total_size)

if __name__ == '__main__':
  main()

It is about 4 time faster on my i5-9600KF (6 core) processor.

Here is the profiling information of the parallel execution:

enter image description here

Each line is a process. The process with the PID 26172 is the master one (starting others and waiting for them). The brown part is the CPU time (activity) of the processes. The profiling is split in small piece of 1ms. Light grey and light green pieces are the pieces where there was a context-switch or a synchronization and dark green one are the piece where the process was only doing computation.

What we can see is that the worker are relatively active but they are a lot of time where they are starving or waiting for something (typically the access to IO-related resources). The main thread is slowed down by workers since the operation down not perfectly scale. In fact, the calls to os.path.getsize are about 10-15% slower, mainly because CreateFileW does not scale (certainly due to system locks). The parallel computation cannot be faster than the speed of os.walk. On some platforms with many cores and a scalable OS file subsystem, os.walk may be a bottleneck. If so, the file tree can be travelled using recursive tasks at the expense of a significantly more complex code.

In practice, on Windows, it turns out that the os.path.getsize implementation is apparently not efficient. Based on this post, using GetFileAttributesEx should be significantly faster. The bad news is that the alternative function os.stat also use the same method. Thus, if you want to do this operation efficiently, it is certainly a good idea to write a C extension for that. Such a C extension can also benefit from using threads while not having any issues with the GIL, pickling, nor the IPC.

Jérôme Richard
  • 41,678
  • 6
  • 29
  • 59