Your code is very inefficient in the sense that it opens and appends data for each line/row of the input file it processes—which will be a tremendous number of times if the input file is that huge (because the OS calls that are needed to do so that are relatively slow).
Plus there's at least one bug in your code I noticed—namely the line:
save_path += cik + ".csv"
which just keeps making save_path
longer and longer...not what would be needed.
Anyway, here's something that should work faster, although it will likely still take a fairly long time to process a file that big. It speeds the process up by caching intermediate results. It does this by only opening the different output csv files and creating their corresponding csv.writer
objects as infrequently as possible, the first time they are needed and again only if they got closed because the cache reached its maximum length.
Note that the cache may consume lot of memory iteself depending on how many unique csv output files there are and how many of them can be opened at the same time—but using a lot of memory is what makes it run faster. You'll need to play around and manually adjust the MAX_OPEN
value to find the best trade-off between speed and memory-usage, all while staying below your OS's limit of how many files are allow to be opened at one time.
Also note that it might be possible to make what it does even more efficient by more intelligently choosing which existing file entry to close rather than just picking (an open) one at random. However, whether doing that would really help depends on whether there's any sort of advantageous grouping or other order to the data in the input file.
import csv
import os
import random
class CSVWriterCache(dict):
""" Dict subclass to cache pairs of csv files and associated
csv.writers. When a specified maximum number of them already
exist, a random one closed, but an entry for it is retained
and marked "closed" so it can be re-opened in append mode
later if it's ever referenced again. This limits the number of
files open at any given time.
"""
_CLOSED = None # Marker to indicate that file has seen before.
def __init__(self, max_open, **kwargs):
self.max_open = max_open
self.cur_open = 0 # Number of currently opened csv files.
self.csv_kwargs = kwargs # keyword args for csv.writer.
# Adding the next two non-dict special methods makes the class a
# context manager which allows it to be used in "with" statements
# to do automatic clean-up.
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def __getitem__(self, k):
if k not in self:
return self.__missing__(k)
else:
try:
csv_writer, csv_file = self.get(k)
except TypeError: # Needs to be re-opened in append mode.
csv_file = open(k, 'a', newline='')
csv_writer = csv.writer(csv_file, **self.csv_kwargs)
return csv_writer, csv_file
def __missing__(self, csv_file_path):
""" Create a csv.writer corresponding to the file path and add it
and the file to the cache.
"""
if self.cur_open == self.max_open: # Limit?
# Randomly choose a cached entry with a previously seen
# file path that is still open (not _CLOSED). The associated
# file is then closed, but the entry for the file path is
# left in the dictionary so it can be recognized as having
# been seen before and be re-opened in append mode.
while True:
rand_entry = random.choice(tuple(self.keys()))
if self[rand_entry] is not self._CLOSED:
break
csv_writer, csv_file = self[rand_entry]
csv_file.close()
self.cur_open -= 1
self[rand_entry] = self._CLOSED # Mark as previous seen but closed.
csv_file = open(csv_file_path, 'w', newline='')
csv_writer = csv.writer(csv_file, **self.csv_kwargs)
self.cur_open += 1
# Add pair to cache.
super().__setitem__(csv_file_path, (csv_writer, csv_file))
return csv_writer, csv_file
# Added, non-standard dict method.
def close(self):
""" Close all the opened files in the cache and clear it out. """
for key, entry in self.items():
if entry is not self._CLOSED:
entry[1].close()
self[key] = self._CLOSED # Not strictly necessary.
self.cur_open -= 1 # For sanity check at end.
self.clear()
assert(self.cur_open == 0) # Sanity check.
if __name__ == '__main__':
file_path = "./master.tsv"
save_path = "./data-sorted"
MAX_OPEN = 1000 # Number of opened files allowed (max is OS-dependent).
# MAX_OPEN = 2 # Use small value for testing.
# Create output directory if it does not exist.
if os.path.exists(save_path):
if not os.path.isdir(save_path):
raise RuntimeError("Path {!r} exists, but isn't a directory")
else:
print('Creating directory: {!r}'.format(save_path))
os.makedirs(save_path)
# Process the input file using a cache of csv.writers.
with open(file_path, 'r') as masterfile, \
CSVWriterCache(MAX_OPEN, quoting=csv.QUOTE_ALL) as csv_writer_cache:
for line in masterfile:
line_split = line.rstrip().split("|")
cik = line_split[0].zfill(10)
save_file_path = os.path.join(save_path, cik + ".csv")
writer = csv_writer_cache[save_file_path][0]
writer.writerow(line_split)
print('{!r} file processing completed'.format(os.path.basename(file_path)))