2

I have the following code that runs really slow. It's a program to split a big file (80 gig) and place it into a tree folder structure for fast lookups. I made several comments in the code to help you understand it.

# Libraries
import os


# Variables
file="80_gig_file.txt"
outputdirectory="sorted"
depth=4 # This is the tree depth


# Preperations
os.makedirs(outputdirectory)

# Process each line in the file
def pipeline(line):
    # Strip symbols from line
    line_stripped=''.join(e for e in line if e.isalnum())
    # Reverse the line
    line_stripped_reversed=line_stripped[::-1]
    file=outputdirectory
    # Create path location in folderbased tree
    for i in range(min((depth),len(line_stripped))):
        file=os.path.join(file,line_stripped_reversed[i])
    # Create folders if they don't exist
    os.makedirs(os.path.dirname(file), exist_ok=True)
    # Name the file, with "-file"
    file=file+"-file"
    # This is the operation that slows everything down. 
    # It opens, writes and closes a lot of small files. 
    # I cannot keep them open because currently half a million possibilities (and thus files) are worst case open (n=26^4).
    f = open(file, "a")
    f.write(line)
    f.close()


# Read file line by line and by not loading it entirely in memory
# Here it is possible to work with a queue I think, but how to do it properly without loading too much in memory?
with open(file) as infile:
    for line in infile:
        pipeline(line)

Is there a way to make multithreading work? Because I tried a few examples myself I found online and it put everything in memory causing my computer to freeze multiple times.

javubex
  • 33
  • 6
  • since the bottleneck is HDD access, don't expect a major speedup from parallelization (you might gain something if parallel file access is somehow implemented in your system, but since it's not the CPU that gets hogged, adding more cores won't help) – GPhilo Feb 05 '19 at 11:06
  • Only one core is using 100% and my disk usage is below 4% according to system monitoring. I have a NVMe SSD so I really think there might be room for improvement with multiple cores. – javubex Feb 05 '19 at 11:23
  • That sounds promising then. Does the big file need to stay that way or could you split it into chunks? It'd be much easier to parallelize if split in chunks – GPhilo Feb 05 '19 at 11:36
  • The Big file might be pre-processed and split into chunks. I am not familiar with working with chunks so if you can direct me to a few examples I can investigate on how to tackle this. I was looking at https://www.blopig.com/blog/2016/08/processing-large-files-using-python/, but somehow the last codeblock gives me `ValueError: I/O operation on closed file.` – javubex Feb 05 '19 at 11:58

1 Answers1

3

First, the (IMO) simplest solution

If, as it seems, the lines are completely independent, just split your file in N chunks, pass the filename to open as a program argument and run multiple instances of your current script starting them manually on multiple command lines.

Pros:

  • No need to delve with the multiprocessing, inter-process communication, etc
  • Doesn't need to alter the code too much

Cons:

  • You need to preprocess the big file splitting it into chunks (although this will be much faster than your current execution time, since you won't have an open-close-per-line scenario)
  • You need to start the processes yourself, passing the appropriate filename for each of them

This would be implemented as:

Preprocessing:

APPROX_CHUNK_SIZE = 1e9 #1GB per file, adjust as needed
with open('big_file.txt') as fp:
  chunk_id = 0
  next_chunk = fp.readlines(APPROX_CHUNK_SIZE)
  while next_chunk:
    with open('big_file_{}.txt'.format(chunk_id), 'w') as ofp:
      ofp.writelines(next_chunk)
    chunk_id += 1
    next_chunk = fp.readlines(APPROX_CHUNK_SIZE)

From the readlines docs:

If the optional sizehint argument is present, instead of reading up to EOF, whole lines totalling approximately sizehint bytes (possibly after rounding up to an internal buffer size) are read.

Doing it this way won't ensure an even number of lines in all chunks, but will make preprocessing much faster,since you're reading in blocks and not line-by-line. Adapt the chunk size as needed. Also, note that by using readlines we can be sure we won't have lines broken between chunks, but since the function returns a list of lines, we use writelines to write that in our output file (which is equivalent to loop over the list and ofp.write(line)). For the sake of completeness, let me note that you could also concatenate all strings in-memory and call write just once (i.e., do ofp.write(''.join(next_chunk))), which might bring you some (minor) performance benefit, paid in (much) higher RAM usage.

Main script:

The only modifications you need are at the very top:

import sys
file=sys.argv[1]
... # rest of your script here

By using argv you ca pass command-line arguments to your program (in this case, the file to open). Then, just run your script as:

python process_the_file.py big_file_0.txt

This will run one process. Open multiple terminals and run the same command with big_file_N.txt for each and they'll be independent from each other.

Note: I use argv[1] because for all programs the first value of argv (i.e., argv[0]) is always the program name.


Then, the multiprocessing solution

Although effective, the first solution is not quite elegant, especially since you'll have 80 files if you start from a file 80GBs in size.

A cleaner solution is to make use of python's multiprocessing module (important: NOT threading! If you don't know the difference, look up "global interpreter lock" and why multithreading in python doesn't work the way you think it would).

The idea is to have one "producer" process that opens the big file and continuously puts lines from it in a queue. Then, a pool of "consumer" processes that extract from the queue the lines and do the processing.

Pros:

  • One script does everything
  • No need to open multiple terminals and do typing

Cons:

  • Complexity
  • uses inter-process communication, which has some overhead

This would be implemented as follows:

# Libraries
import os
import multiprocessing

outputdirectory="sorted"
depth=4 # This is the tree depth

# Process each line in the file
def pipeline(line):
    # Strip symbols from line
    line_stripped=''.join(e for e in line if e.isalnum())
    # Reverse the line
    line_stripped_reversed=line_stripped[::-1]
    file=outputdirectory
    # Create path location in folderbased tree
    for i in range(min((depth),len(line_stripped))):
        file=os.path.join(file,line_stripped_reversed[i])
    # Create folders if they don't exist
    os.makedirs(os.path.dirname(file), exist_ok=True)
    # Name the file, with "-file"
    file=file+"-file"
    # This is the operation that slows everything down. 
    # It opens, writes and closes a lot of small files. 
    # I cannot keep them open because currently half a million possibilities (and thus files) are worst case open (n=26^4).
    f = open(file, "a")
    f.write(line)
    f.close()

if __name__ == '__main__':
    # Variables
    file="80_gig_file.txt"

    # Preperations
    os.makedirs(outputdirectory)
    pool = multiprocessing.Pool() # by default, 1 process per CPU
    LINES_PER_PROCESS = 1000 # adapt as needed. Higher is better, but consumes more RAM

    with open(file) as infile:
        next(pool.imap(pipeline, infile, LINES_PER_PROCESS))
        pool.close()
        pool.join()

The if __name__ == '__main__' line is a barrier to separate code that runs on every process from the one that runs only on the "father". Every process defines pipeline, but only the father actually spawns a pool of workers and applies the function. You find more details about multiprocessing.map here

Edit:

Added closing and joining o the pool to prevent the main process from exiting and killing the children in the process.

Community
  • 1
  • 1
GPhilo
  • 18,519
  • 9
  • 63
  • 89
  • I really appreciate the time and effort you put into this answer. It is really nice explained and properly presented. I tried to run the code on my textfile, but somehow it immediately is finished and the sorted folder is empty. Are you sure that the last lines where you open the file are correct? I think it tries to load everything in memory and exits immediately. – javubex Feb 05 '19 at 13:01
  • That's my bad, I used `imap` instead of `map`. The difference is, `imap` returns an iterable and lazily evaluates the values only when the iterator asks for the next element. Since the code never asked for the next element, nothing happened. Replacing it with `map` is the way to go (and it checks in the quick test I just ran), let me know if this works better! – GPhilo Feb 05 '19 at 14:18
  • FYI, there is a `writelines` as well. – Mark Tolonen Feb 05 '19 at 15:35
  • @MarkTolonen thanks! I wasn't aware of it :) I'll update the code to use it! – GPhilo Feb 05 '19 at 15:45
  • @GPhilo, if you put `print(line)` right under `def pipeline(line):`, for debugging purposes, you can see that the filename "80_gig_file.txt" is printed, letter by letter. It doesn't print the line which should be expected. If I change `pool.map(pipeline, file, LINES_PER_PROCESS)` to `pool.map(pipeline, infile, LINES_PER_PROCESS)` , everything is loaded in memory and I have to kill it before my computer freezes. – javubex Feb 06 '19 at 10:54
  • The `file` vs `infile` typo I forgot to fix in the code here. Yes, indeed you have to put `infile` there, of course. As per the "everything is loaded in memory", what value are you using for `LINES_PER_PROCESS`? Does setting that to 1 still give you problems? – GPhilo Feb 06 '19 at 10:57
  • @GPhilo, I also tried setting it to 1, but I can see my RAM go up (~1 gig every second). I tested it with a small subset (100 lines file), the program does work though. Based on that I assume that the code loads the entire file in memory. Already thanks for all your time you've put into this issue, I really appreciate it and I think we are really, really close. – javubex Feb 06 '19 at 11:39
  • Mhm that's interesting... it seems [`multiprocessing.map` internally calls something like `list(iterable)` for iterables that don't have `__len__`](https://stackoverflow.com/questions/44708312/how-to-use-a-generator-as-an-iterable-with-multiprocessing-map-function), could you make an experiment changing the line with `map` to `next(pool.imap(pipeline, infile, LINES_PER_PROCESS))`? (I tried in my test and I can't quite decide if the `next(..)` is necessary or not) – GPhilo Feb 06 '19 at 11:59
  • @GPhilo, interesting, now it seems to process only 40 items from the top. – javubex Feb 06 '19 at 13:00
  • Ignore my last comment. I think I know why the issues: The main process quits before the workers terminate and this kills them. Please add `pool.close()` and `pool.join()` right after the `pool.imap()` call (no need for next(...) or so) – GPhilo Feb 06 '19 at 13:21
  • You are my hero. https://i.imgur.com/OWEegID.png. Works like a charm. I still had to use the next statement, but the pool closing helped. – javubex Feb 06 '19 at 14:38
  • I'm glad it works! I'm marking the code for when inevitably I'll have to do this again, i's very interesting that `next` is still necessary in your case! In my test the simple call to imap was sufficient.. go figure ;) – GPhilo Feb 06 '19 at 14:42