3

I have a large tsv file (~2.5Gb). I iterate over each line where the line has 6 tabs. I take the first tab of each line and append the line to a csv file that is based on this first tab. The goal is to end with a file sorted with csv files based off of the master tsv lines.

This works on a small scale file, but when I run on a large file the IPython console never concludes. The file I am saving to looks as if it is being filled, but when I open it nothing is shown.

import csv

file_path = ".../master.tsv"

with open(file_path, 'r') as masterfile:
    for line in masterfile:
        line_split = line.split("|")
        cik = line_split[0].zfill(10)

        save_path = ".../data-sorted/"
        save_path += cik + ".csv"

        with open(save_path, 'a') as savefile:
            wr = csv.writer(savefile, quoting=csv.QUOTE_ALL)
            wr.writerow(line_split)
Mad Physicist
  • 107,652
  • 25
  • 181
  • 264
  • Did you try pandas? – Quang Hoang Dec 27 '18 at 17:40
  • Sounds like it's trying to load the entire file into memory – Maximilian Burszley Dec 27 '18 at 17:41
  • 2
    @TheIncorrigible1 It shouldn't. [this answer](https://stackoverflow.com/a/8010133/1491895) says that the file iterable uses buffered I/O so it doesn't load everything into memory. – Barmar Dec 27 '18 at 17:43
  • 2
    This code might just be really slow because it has to open and close the output file on every iteration. – Barmar Dec 27 '18 at 17:44
  • @Barmar Not sure if IPython has a different implementation. Also OP, you say it's TSV but you're splitting on pipe? – Maximilian Burszley Dec 27 '18 at 17:44
  • 1
    @TheIncorrigible1 IPython is a library, not a different implementation of Python. – Alex Hall Dec 27 '18 at 17:46
  • @Barmar but OP says that when he opens an output file he sees nothing, not just that the whole process is slow. – Alex Hall Dec 27 '18 at 17:47
  • 2
    "The file I am saving to looks as if it is being filled" what does that mean? – Alex Hall Dec 27 '18 at 17:48
  • If the file is too large, you can use `dask.dataframe.read_csv()` to read the csv file. – Rish Dec 27 '18 at 18:14
  • 1
    It looks like you're saving to a different file every iteration since you're using the first element of line_split to determine the save path. – jonyfries Dec 27 '18 at 18:40
  • 1
    To put context behind what I'm trying to do I'm pulling quarterly SEC filings from 1993 (using: https://github.com/edouardswiac/python-edgar). I have a master file stitched together so there will be roughly 4 filings per CIK over 25 years. Essentially I'm expecting some 60,000 csv files with around ten lines per csv all coming from the master tsv. When I say the file looks like it filled I mean when I run Get Info the file has 67,000 items, but when I open it in Finder there is nothing shown. – user2544427 Dec 27 '18 at 21:05
  • Try opening and reading the file in binary mode using python. Is the data there? Do the line endings look okay? The only problem I see is that you should let the CSV module handle newlines (do `open(save_path, 'a', newline='')`) but otherwise, considering the large number of output files, this seems like a good solution to me. – tdelaney Dec 27 '18 at 22:33
  • user2544427: FWIW, I downloaded and concatenated together just the last 2 years worth of the edgar data (2017–2018). The resulting ~257 MB file had 1,911,250 lines and contained 176,487 unique CIKs, which would correspond to creating that many different CSV files. For 25 years, there would definitely be even more... – martineau Dec 28 '18 at 21:08

2 Answers2

2

Your code is very inefficient in the sense that it opens and appends data for each line/row of the input file it processes—which will be a tremendous number of times if the input file is that huge (because the OS calls that are needed to do so that are relatively slow).

Plus there's at least one bug in your code I noticed—namely the line:

save_path += cik + ".csv"

which just keeps making save_path longer and longer...not what would be needed.

Anyway, here's something that should work faster, although it will likely still take a fairly long time to process a file that big. It speeds the process up by caching intermediate results. It does this by only opening the different output csv files and creating their corresponding csv.writer objects as infrequently as possible, the first time they are needed and again only if they got closed because the cache reached its maximum length.

Note that the cache may consume lot of memory iteself depending on how many unique csv output files there are and how many of them can be opened at the same time—but using a lot of memory is what makes it run faster. You'll need to play around and manually adjust the MAX_OPEN value to find the best trade-off between speed and memory-usage, all while staying below your OS's limit of how many files are allow to be opened at one time.

Also note that it might be possible to make what it does even more efficient by more intelligently choosing which existing file entry to close rather than just picking (an open) one at random. However, whether doing that would really help depends on whether there's any sort of advantageous grouping or other order to the data in the input file.

import csv
import os
import random

class CSVWriterCache(dict):
    """ Dict subclass to cache pairs of csv files and associated
        csv.writers. When a specified maximum number of them already
        exist, a random one closed, but an entry for it is retained
        and marked "closed" so it can be re-opened in append mode
        later if it's ever referenced again. This limits the number of
        files open at any given time.
    """
    _CLOSED = None  # Marker to indicate that file has seen before.

    def __init__(self, max_open, **kwargs):
        self.max_open = max_open
        self.cur_open = 0  # Number of currently opened csv files.
        self.csv_kwargs = kwargs  # keyword args for csv.writer.

    # Adding the next two non-dict special methods makes the class a
    # context manager which allows it to be used in "with" statements
    # to do automatic clean-up.
    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def __getitem__(self, k):
        if k not in self:
            return self.__missing__(k)
        else:
            try:
                csv_writer, csv_file = self.get(k)
            except TypeError:  # Needs to be re-opened in append mode.
                csv_file = open(k, 'a', newline='')
                csv_writer = csv.writer(csv_file, **self.csv_kwargs)

            return csv_writer, csv_file

    def __missing__(self, csv_file_path):
        """ Create a csv.writer corresponding to the file path and add it
            and the file to the cache.
        """
        if self.cur_open == self.max_open:  # Limit?
            # Randomly choose a cached entry with a previously seen
            # file path that is still open (not _CLOSED). The associated
            # file is then closed, but the entry for the file path is
            # left in the dictionary so it can be recognized as having
            # been seen before and be re-opened in append mode.
            while True:
                rand_entry = random.choice(tuple(self.keys()))
                if self[rand_entry] is not self._CLOSED:
                    break
            csv_writer, csv_file = self[rand_entry]
            csv_file.close()
            self.cur_open -= 1
            self[rand_entry] = self._CLOSED  # Mark as previous seen but closed.

        csv_file = open(csv_file_path, 'w', newline='')
        csv_writer = csv.writer(csv_file, **self.csv_kwargs)
        self.cur_open += 1

        # Add pair to cache.
        super().__setitem__(csv_file_path, (csv_writer, csv_file))
        return csv_writer, csv_file

    # Added, non-standard dict method.
    def close(self):
        """ Close all the opened files in the cache and clear it out. """
        for key, entry in self.items():
            if entry is not self._CLOSED:
                entry[1].close()
                self[key] = self._CLOSED  # Not strictly necessary.
                self.cur_open -= 1  # For sanity check at end.
        self.clear()
        assert(self.cur_open == 0)  # Sanity check.

if __name__ == '__main__':
    file_path = "./master.tsv"
    save_path = "./data-sorted"
    MAX_OPEN  = 1000  # Number of opened files allowed (max is OS-dependent).
#    MAX_OPEN  = 2  # Use small value for testing.

    # Create output directory if it does not exist.
    if os.path.exists(save_path):
        if not os.path.isdir(save_path):
            raise RuntimeError("Path {!r} exists, but isn't a directory")
    else:
        print('Creating directory: {!r}'.format(save_path))
        os.makedirs(save_path)

    # Process the input file using a cache of csv.writers.
    with open(file_path, 'r') as masterfile, \
         CSVWriterCache(MAX_OPEN, quoting=csv.QUOTE_ALL) as csv_writer_cache:
        for line in masterfile:
            line_split = line.rstrip().split("|")
            cik = line_split[0].zfill(10)

            save_file_path = os.path.join(save_path, cik + ".csv")
            writer = csv_writer_cache[save_file_path][0]
            writer.writerow(line_split)

    print('{!r} file processing completed'.format(os.path.basename(file_path)))
martineau
  • 119,623
  • 25
  • 170
  • 301
  • I think this is the solution; however, I get: "[Errno 23] Too many open files in system:" I tried [this](https://stackoverflow.com/q/16526783/2544427) but even when I ulimit -n 10000 I got the same error. – user2544427 Dec 27 '18 at 21:32
  • user2544427: I don't think that linked solution really applies in this case since you're not using `subprocess`. The cached approach used in this answer is keeping all the files open until the entire input file has been processed—and apparently that's more than your OS can handle. I think it would be possible to make the caching "smarter" and limit the number it actually stores to some maximum. If I have a chance, I'll work on that and update my answer accordingly. You could also try doing something like that yourself... – martineau Dec 27 '18 at 21:40
  • user2544427: OK, I've updated my answer so the cache class now has a length limit argument that controls how many files it will allow to be opened at one time. I also made it usable in `with` statement, which makes clean-up automatic. – martineau Dec 28 '18 at 02:31
  • user2544427: My last update doesn't work correctly. I had been testing it with a relatively small input file I created manually, but now a much larger one that causes a "Too many open files" exception to occur—and the currently posted version doesn't handle it properly. I'll work on in it some more and try fixing the problem. – martineau Dec 29 '18 at 03:15
0

Assuming you have enough RAM, you may be better off sorting the file in memory, e.g. into a dictionary, and writing to disk all at once. If I/O is indeed your bottleneck, you should get lots of mileage from only opening an output file once.

from collections import defaultdict
from os.path import join

file_path = ".../master.tsv"

data = collections.defaultdict(list)
with open(file_path, 'r') as masterfile:
    for line in masterfile:
        cik = line.split("|", 1)[0].zfill(10)
        data[cik].append(line)

for cik, lines in data.items():
    save_path = join(".../data-sorted", cik + ".csv")

    with open(save_path, 'w') as savefile:
        wr = csv.writer(savefile, quoting=csv.QUOTE_ALL)
        for line in lines:
            wr.writerow(line.split("|"))

It is possible that you will not have sufficient memory to load the entire file. In that case, you can dump it in chunks, which if large enough, should still end up saving you significant I/O. The chunking method below is very quick and dirty.

from collections import defaultdict
from itertools import groupby
from os.path import join

chunk_size = 10000  # units of lines

file_path = ".../master.tsv"

with open(file_path, 'r') as masterfile:
    for _, chunk in groupby(enumerate(masterfile),
                            key=lambda item: item[0] // chunk_size):
        data = defaultdict(list)
        for line in chunk:
            cik = line.split("|", 1)[0].zfill(10)
            data[cik].append(line)
        for cik, lines in data.items():
            save_path = join(".../data-sorted", cik + ".csv")

            with open(save_path, 'a') as savefile:
                wr = csv.writer(savefile, quoting=csv.QUOTE_ALL)
                for line in lines:
                    wr.writerow(line.split("|"))
Mad Physicist
  • 107,652
  • 25
  • 181
  • 264
  • Hmm, aside from a few otherwise minor typographical errors in your code, I keep getting a `SyntaxError: invalid syntax` on the `key=lambda item: item[0] // chunk_size)` part of the `groupby()` call, which I don't know how to fix. I'm not entirely sure, because it looks OK to me, but I think it could be due to a bug in the Python interpreter itself (or perhaps just in my own understanding of how the language is parsed). – martineau Dec 28 '18 at 05:44
  • @martineau. Will investigate. Feel free to correct the typos if you like. I'm currently on mobile, so my code is untested. – Mad Physicist Dec 28 '18 at 05:47
  • 1
    @martineau. There was a missing colon at the end of the for loop. Fixed now. – Mad Physicist Dec 28 '18 at 05:50
  • Ah, that fixes that part. As for fixing the other typos, I'll leave that to you because there are unrelated issues arising after I tried to fix them as well. Both approaches look promising, btw. – martineau Dec 28 '18 at 05:57
  • @martineau. Thanks. I've fixed some missing parens, will test the rest tomorrow after some sleep. – Mad Physicist Dec 28 '18 at 06:10