19

I am trying to parse many files found in a directory, however using multiprocessing slows my program.

# Calling my parsing function from Client.
L = getParsedFiles('/home/tony/Lab/slicedFiles') <--- 1000 .txt files found here.
                                                       combined ~100MB

Following this example from python documentation:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(f, [1, 2, 3]))

I've written this piece of code:

from multiprocessing import Pool
from api.ttypes import *

import gc
import os

def _parse(pathToFile):
    myList = []
    with open(pathToFile) as f:
        for line in f:
            s = line.split()
            x, y = [int(v) for v in s]
            obj = CoresetPoint(x, y)
            gc.disable()
            myList.append(obj)
            gc.enable()
    return Points(myList)

def getParsedFiles(pathToFile):
    myList = []
    p = Pool(2)
    for filename in os.listdir(pathToFile):
        if filename.endswith(".txt"):
            myList.append(filename)
    return p.map(_pars, , myList)

I followed the example, put all the names of the files that end with a .txt in a list, then created Pools, and mapped them to my function. Then I want to return a list of objects. Each object holds the parsed data of a file. However it amazes me that I got the following results:

#Pool 32  ---> ~162(s)
#Pool 16 ---> ~150(s)
#Pool 12 ---> ~142(s)
#Pool 2 ---> ~130(s)

Graph:
enter image description here

Machine specification:

62.8 GiB RAM
Intel® Core™ i7-6850K CPU @ 3.60GHz × 12   

What am I missing here ?
Thanks in advance!

Tony Tannous
  • 14,154
  • 10
  • 50
  • 86

3 Answers3

14

Looks like you're I/O bound:

In computer science, I/O bound refers to a condition in which the time it takes to complete a computation is determined principally by the period spent waiting for input/output operations to be completed. This is the opposite of a task being CPU bound. This circumstance arises when the rate at which data is requested is slower than the rate it is consumed or, in other words, more time is spent requesting data than processing it.

You probably need to have your main thread do the reading and add the data to the pool when a subprocess becomes available. This will be different to using map.

As you are processing a line at a time, and the inputs are split, you can use fileinput to iterate over lines of multiple files, and map to a function processing lines instead of files:

Passing one line at a time might be too slow, so we can ask map to pass chunks, and can adjust until we find a sweet-spot. Our function parses chunks of lines:

def _parse_coreset_points(lines):
    return Points([_parse_coreset_point(line) for line in lines])

def _parse_coreset_point(line):
    s = line.split()
    x, y = [int(v) for v in s]
    return CoresetPoint(x, y)

And our main function:

import fileinput

def getParsedFiles(directory):
    pool = Pool(2)

    txts = [filename for filename in os.listdir(directory):
            if filename.endswith(".txt")]

    return pool.imap(_parse_coreset_points, fileinput.input(txts), chunksize=100)
Peter Wood
  • 23,859
  • 5
  • 60
  • 99
  • But the files are there, the list sent to the function contain all the file names. It took the program 0.1 s to put all the file names in the list. Where is the I/O taking affect here ? the pools are not waiting for the names to be extracted and put in the list before they can go on. – Tony Tannous Mar 06 '17 at 08:29
  • You're reading from the files. I don't know how your disk works, but I imagine you can only read one file at a time. – Peter Wood Mar 06 '17 at 08:37
  • See [Is it a good idea to read multiple files at the same time?](http://stackoverflow.com/questions/28614953/is-it-a-good-idea-to-read-multiple-files-at-the-same-time) – Peter Wood Mar 06 '17 at 08:46
  • 1
    @TonyTannous probably, yes would be the default answer. It may be SSDs are different, or your operating system might be hiding multiple drives behind a uniform file system and you could access them in parallel. But if there is one mechanical drive, sequential is fastest. – Peter Wood Mar 06 '17 at 08:53
  • If your data is really just line based and split over multiple files, you could probably just do a line at a time. Will update answer. – Peter Wood Mar 06 '17 at 10:02
  • I've updated to use chunks of lines. You can play with the chunksize until you get something reasonable. It's currently 100 lines. – Peter Wood Mar 07 '17 at 09:22
7

In general it is never a good idea to read from the same physical (spinning) hard disk from different threads simultaneously, because every switch causes an extra delay of around 10ms to position the read head of the hard disk (would be different on SSD).

As @peter-wood already said, it is better to have one thread reading in the data, and have other threads processing that data.

Also, to really test the difference, I think you should do the test with some bigger files. For example: current hard disks should be able to read around 100MB/sec. So reading the data of a 100kB file in one go would take 1ms, while positioning the read head to the beginning of that file would take 10ms.

On the other hand, looking at your numbers (assuming those are for a single loop) it is hard to believe that being I/O bound is the only problem here. Total data is 100MB, which should take 1 second to read from disk plus some overhead, but your program takes 130 seconds. I don't know if that number is with the files cold on disk, or an average of multiple tests where the data is already cached by the OS (with 62 GB or RAM all that data should be cached the second time) - it would be interesting to see both numbers.

So there has to be something else. Let's take a closer look at your loop:

for line in f:
    s = line.split()
    x, y = [int(v) for v in s]
    obj = CoresetPoint(x, y)
    gc.disable()
    myList.append(obj)
    gc.enable()

While I don't know Python, my guess would be that the gc calls are the problem here. They are called for every line read from disk. I don't know how expensive those calls are (or what if gc.enable() triggers a garbage collection for example) and why they would be needed around append(obj) only, but there might be other problems because this is multithreading:

Assuming the gc object is global (i.e. not thread local) you could have something like this:

thread 1 : gc.disable()
# switch to thread 2
thread 2 : gc.disable()
thread 2 : myList.append(obj)
thread 2 : gc.enable()
# gc now enabled!
# switch back to thread 1 (or one of the other threads)
thread 1 : myList.append(obj)
thread 1 : gc.enable()

And if the number of threads <= number of cores, there wouldn't even be any switching, they would all be calling this at the same time.

Also, if the gc object is thread safe (it would be worse if it isn't) it would have to do some locking in order to safely alter it's internal state, which would force all other threads to wait.

For example, gc.disable() would look something like this:

def disable()
    lock()  # all other threads are blocked for gc calls now
    alter internal data
    unlock()

And because gc.disable() and gc.enable() are called in a tight loop, this will hurt performance when using multiple threads.

So it would be better to remove those calls, or place them at the beginning and end of your program if they are really needed (or only disable gc at the beginning, no need to do gc right before quitting the program).

Depending on the way Python copies or moves objects, it might also be slightly better to use myList.append(CoresetPoint(x, y)).

So it would be interesting to test the same on one 100MB file with one thread and without the gc calls.

If the processing takes longer than the reading (i.e. not I/O bound), use one thread to read the data in a buffer (should take 1 or 2 seconds on one 100MB file if not already cached), and multiple threads to process the data (but still without those gc calls in that tight loop).

You don't have to split the data into multiple files in order to be able to use threads. Just let them process different parts of the same file (even with the 14GB file).

Danny_ds
  • 11,201
  • 1
  • 24
  • 46
  • `gc.disable() and `gc.enable()` are commands to disable\enable garbage collector. When appending to a huge list, it slows down as the list gets bigger, so a solution is to disable and then enable the garbage collector. I don't know why it takes 3 minutes if you say it should take 1 minute. Perhapds turning this to a data object then appending to a list is not cheap. +calling this from different module. But thanks for answer, first part was superb. +1 – Tony Tannous Mar 06 '17 at 18:44
  • @TonyTannous Yes, but there will be conflicts/blocks due to the multiple threads. Maybe try to disable once before calling `getParsedFiles()`. Even before and after each loop won't work because another thread could call enable while the first is still in the loop. – Danny_ds Mar 06 '17 at 18:50
  • @TonyTannous The reading of one 100MB file from disk should take one (or two) seconds, not minutes :) – Danny_ds Mar 06 '17 at 18:52
  • I will give try it tomorrow once I am back in Lab. One/Two seconds! then I don't know why it takes 3 minutes with my pretty simple program... I am using `thrift` framework and I am defining the types with thrift types so perhaps that is a reason too. – Tony Tannous Mar 06 '17 at 18:54
1

A copy-paste snippet, for people who come from Google and don't like reading

Example is for json reading, just replace __single_json_loader with another file type to work with that.

from multiprocessing import Pool
from typing import Callable, Any, Iterable
import os
import json


def parallel_file_read(existing_file_paths: Iterable[str], map_lambda: Callable[[str], Any]):
    result = {p: None for p in existing_file_paths}
    pool = Pool()
    for i, (temp_result, path) in enumerate(zip(pool.imap(map_lambda, existing_file_paths), result.keys())):
        result[path] = temp_result
    pool.close()
    pool.join()
    return result


def __single_json_loader(f_path: str):
    with open(f_path, "r") as f:
        return json.load(f)


def parallel_json_read(existing_file_paths: Iterable[str]):
    combined_result = parallel_file_read(existing_file_paths, __single_json_loader)
    return combined_result

And usage


if __name__ == "__main__":
    def main():
        directory_path = r"/path/to/my/file/directory"
        assert os.path.isdir(directory_path)

        d: os.DirEntry
        all_files_names = [f for f in os.listdir(directory_path)]
        all_files_paths = [os.path.join(directory_path, f_name) for f_name in all_files_names]
        assert(all(os.path.isfile(p) for p in all_files_paths))

        combined_result = parallel_json_read(all_files_paths)

    main()

Very straight forward to replace a json reader with any other reader, and you're done.

Gulzar
  • 23,452
  • 27
  • 113
  • 201