2

I have a large CSV file that I would like to split into a number that is equal to the number of CPU cores in the system. I want to then use multiprocess to have all the cores work on the file together. However, I am having trouble even splitting the file into parts. I've looked all over google and I found some sample code that appears to do what I want. Here is what I have so far:

def split(infilename, num_cpus=multiprocessing.cpu_count()):
    READ_BUFFER = 2**13
    total_file_size = os.path.getsize(infilename)
    print total_file_size
    files = list()
    with open(infilename, 'rb') as infile:
        for i in xrange(num_cpus):
            files.append(tempfile.TemporaryFile())
            this_file_size = 0
            while this_file_size < 1.0 * total_file_size / num_cpus:
                files[-1].write(infile.read(READ_BUFFER))
                this_file_size += READ_BUFFER
        files[-1].write(infile.readline()) # get the possible remainder
        files[-1].seek(0, 0)
    return files

files = split("sample_simple.csv")
print len(files)

for ifile in files:
    reader = csv.reader(ifile)
    for row in reader:
        print row

The two prints show the correct file size and that it was split into 4 pieces (my system has 4 CPU cores).

However, the last section of the code that prints all the rows in each of the pieces gives the error:

for row in reader:
_csv.Error: line contains NULL byte

I tried printing the rows without running the split function and it prints all the values correctly. I suspect the split function has added some NULL bytes to the resulting 4 file pieces but I'm not sure why.

Does anyone know if this a correct and fast method to split the file? I just want resulting pieces that can be read successfully by csv.reader.

martineau
  • 119,623
  • 25
  • 170
  • 301
Colin
  • 415
  • 3
  • 14
  • Do you have null bytes in your file? Print the lines with repr – Padraic Cunningham Jun 19 '15 at 21:47
  • Can I assume no since printing the rows of the original file without splitting is successful? – Colin Jun 19 '15 at 21:47
  • A simple method would be get the line count and islice the file into n slices – Padraic Cunningham Jun 19 '15 at 21:57
  • You can't split a csv file up at some arbitrary point, the file format is line-oriented, so any splitting would have to occur at a boundary between lines — which implies you know where they are. – martineau Jun 19 '15 at 22:10
  • @Colin, if splitting on lines is good enough http://pastebin.com/xR39xkhi – Padraic Cunningham Jun 19 '15 at 22:22
  • 1
    You did ask for splitting CSV files and there are answers already. However you also gave the reasoning of utilizing all CPU cores. Two points on that. You should check whether file I/O or numbercrunching is your bottleneck. You are aware of the [global interpreter lock](http://programmers.stackexchange.com/questions/186889/why-was-python-written-with-the-gil)? – stefan Jun 20 '15 at 06:57
  • @PadraicCunningham Yes splitting by lines is fine by me as long as there are #core files left at the end with all the data intact. What does '1 for _' do? How about 'lines[-1] += islice(f, None)'? Thanks. – Colin Jun 22 '15 at 03:32
  • @stefan How can I confirm that I/O or number-crunching is my bottleneck? I suspected this was the case as the bulk of my program's time is taken up from processing the csv file line by line. What I'm doing is calculating a multiplication of values from two columns + a moving average of this result. – Colin Jun 22 '15 at 03:35

1 Answers1

5

As I said in a comment, csv files would need to be split on row (or line) boundaries. Your code doesn't do this and potentially breaks them up somewhere in the middle of one — which I suspect is the cause of your _csv.Error.

The following avoids doing that by processing the input file as a series of lines. I've tested it and it seems to work standalone in the sense that it divided the sample file up into approximately equally size chunks because it's unlikely that an whole number of rows will fit exactly into a chunk.

Update

This it is a substantially faster version of the code than I originally posted. The improvement is because it now uses the temp file's own tell() method to determine the constantly changing length of the file as it's being written instead of calling os.path.getsize(), which eliminated the need to flush() the file and call os.fsync() on it after each row is written.

import csv
import multiprocessing
import os
import tempfile

def split(infilename, num_chunks=multiprocessing.cpu_count()):
    READ_BUFFER = 2**13
    in_file_size = os.path.getsize(infilename)
    print 'in_file_size:', in_file_size
    chunk_size = in_file_size // num_chunks
    print 'target chunk_size:', chunk_size
    files = []
    with open(infilename, 'rb', READ_BUFFER) as infile:
        for _ in xrange(num_chunks):
            temp_file = tempfile.TemporaryFile()
            while temp_file.tell() < chunk_size:
                try:
                    temp_file.write(infile.next())
                except StopIteration:  # end of infile
                    break
            temp_file.seek(0)  # rewind
            files.append(temp_file)
    return files

files = split("sample_simple.csv", num_chunks=4)
print 'number of files created: {}'.format(len(files))

for i, ifile in enumerate(files, start=1):
    print 'size of temp file {}: {}'.format(i, os.path.getsize(ifile.name))
    print 'contents of file {}:'.format(i)
    reader = csv.reader(ifile)
    for row in reader:
        print row
    print ''
martineau
  • 119,623
  • 25
  • 170
  • 301
  • Thanks for your help. This code indeed works but on a 130MB file, it took almost 20 minutes. I work often with files that go up to 50GB. Is there a way to make it more efficient? It seems there are a lot of hard drive accesses. – Colin Jun 22 '15 at 03:20
  • 2
    @Colin: The dividing up of the file is inherently a time-consuming process because, at a minimum, it involves reading and writing the the whole file's data. There was a noticeable slow-down when I added the `os.fsync()` as per the docs, even though it seemed to be working without it on my system. If less equally sized temp files where acceptable, you could just compare sizes every other, or every third line. Another approach would be to start with the mathematically exact division points and then adjust each of them by reading forward from that position to the nearest line-break. – martineau Jun 22 '15 at 11:40
  • 1
    @Colin: Do you really need to physically split the file up? It quite possible to have multiple processes reading the same file simultaneously. – martineau Jun 22 '15 at 11:45
  • 1
    Thanks it is much faster now. Took about 2.5s with the updated code. I do not care about the exact size of each chunk as long as they are roughly the same. Originally I thought I'd split up the files so I can feed them to each CPU core. My goal is to analyze the csv file and multiply certain columns together and then find a moving average of it. If I access the same file with 4 cores simultaneously, will there be performance implications? – Colin Jun 22 '15 at 15:01
  • I'd don't think my previous suggestion of not checking the size on every line would have much effect on the current code, which I suspect is now I/O-bound. The only way around that would be to get rid of it, which is why I suggested just letting each process access the same file, I don't think there would significant performance issues doing that per se. There'll still be some overhead involved keeping track of what part of the file each process should limit itself to processing — which means you still need to know where the row boundaries are situated with respect to each file "chunk". – martineau Jun 22 '15 at 17:17
  • In order to access the file using multiple processes, would I just open the file using the csv reader in each process and have them jump to the appropriate line in the file? I would like to compare the performance of that versus splitting the file up and see which is more efficient. On a side note: With the current code, I tried changing the CPU speed between 1.4GHz and 1.9GHz and I can see that it affects the splitting time as well as the crunching time greatly. This would suggest that it's still CPU-bound. – Colin Jun 22 '15 at 19:58
  • Yes, that would be the basic approach to having multiple processes accessing the same file — except that each process will also need to know when to stop (reading its portion of the file). If the file-splitting approach is still CPU-bound, then my previous suggestions may help — to really know where most of the execution time is being spent, you should profile your code. It's fairly easy to do with the built-in `profile` (or `cProfile`) module. – martineau Jun 22 '15 at 22:08
  • It might be possible to speed up the current version even further by not using `temp_file.tell()` and instead keep track of how many bytes have been written to the file so far yourself — thereby avoiding at least the function-call overhead if not more. Just add the length of the number of bytes `infile.next()` is returning to a `bytes_written` accumulater (sort of like what you do in the code in your question). – martineau Jun 22 '15 at 22:31
  • I ended up spending a lot of time trying to debug pickling issues related with the 'split to file chunks' approach I originally wanted to use. It seems files are not one of the things you can pass to processes. I've since then started investigating the other method you suggested, and that is to have all the processes access the same file. While I was able to get this to work, I'm running into performance issues with islice. I will post another thread to ask for help regarding this. – Colin Jul 04 '15 at 22:42
  • Seems like all you'd have to do is pass the chunks' filenames to the other processes rather than the files themselves (which is not something you'd want to pickle even if you could). I'm unsure exactly how you're using `islice` to access the "chunks" in the other processes, but can certainly understand how it could be rather inefficient if done naively. Let me know when you post the other question. – martineau Jul 04 '15 at 23:35
  • I've posted the question here: http://stackoverflow.com/questions/31225782/python-performance-issues-with-islice. I ended up using the suggestion in one of the links that builds a file index so that seek() can be used. That seems to have resolved my performance issues for now. However, I'm only getting about a 50% reduction in speed when using 4 cores compared to 1 core. I'm now investigating why that is. I may revisit your suggestion above regarding passing filenames. How would I associate a filename to each of the chunks based on the code you provided? – Colin Jul 05 '15 at 18:20
  • You can retrieve the name of a temporary file by using `tempfile.NamedTemporaryFile()` and accessing the `.name` attribute of the result. As for seeking, building a file index for every line in the file is somewhat wasteful when you're only going to need a very small number of them. Nevertheless, it seems like you would be getting closer to a 25% reduction in processing time with 4 cores. – martineau Jul 05 '15 at 19:57