When I run the code below on a large corpus of many gigabytes of parquet files, the memory usage steadily increases, e.g. from 3% per core at 10 minutes to 7% at 30 minutes to 10%+ at 6 hours. This despite my explicit call to gc.collect()
and the fact that the files are all approximately the same size, and that only one file should be loaded at a time. I don't see any persistent references to old files, so I am stumped.
This problem forces me to limit the number of cores I use to tokenize my data, doubling my processing time.
from nltk.tokenize import RegexpTokenizer
import pandas as pd
import re
import string
import os
from multiprocessing import Pool
import gc
import glob
def adjust_tokens(tokens):
new_tokens = []
number_regex = '[0-9]+(?:[.][0-9]+)?'
for token in tokens:
if token in string.punctuation:
continue
if token == '...':
continue
if re.match(number_regex, token):
new_tokens.append('aquantity')
else:
new_tokens.append(token.lower())
return new_tokens
class ParquetsIterable():
def __init__(self, tokenizer=None, indir=None, infiles=None,
filters=None, keys=None):
if tokenizer is not None:
self.tokenizer = tokenizer
else:
self.tokenizer = RegexpTokenizer(
r'(?:\w+)(?:[.-]\w+)*[-+]?|(?:[.][.][.])'
+ r'|(?:[!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~])'
)
if infiles is not None:
self.infiles = infiles
else:
self.infiles = [os.path.join(indir, f) for f in os.listdir(indir)]
self.filters = filters
self.keys = keys
def __iter__(self):
for filename in self.infiles:
try:
df = pd.read_parquet(filename)
except Exception:
print('Invalid parquet file %s\n' % filename)
continue
if self.keys is not None:
try:
assert (self.keys[filename].values
== df['key'].values).all()
except AssertionError:
# This never happens
print('Skipping %s because of reorderinging'
' and my unwillingness to write in a join'
% filename)
continue
if self.filters is not None:
df = df[self.filters[filename].values]
gc.collect()
for text in df['sentences']:
for sentence in text.split('\n'):
yield adjust_tokens(self.tokenizer.tokenize(sentence))
def write_iterables_to_files(iterables, files, n_cores=8):
pairs = zip(iterables, files)
with Pool(n_cores) as p:
p.map(write_iterable_to_file, pairs)
def write_iterable_to_file(iterable_and_file):
iterable, file_ = iterable_and_file
with open(file_, 'w') as f:
for tokens in iterable:
f.write(' '.join(tokens) + '\n')
if '__name__' == '__main__':
files = sorted([f for f in glob.glob('data/sentence-parquets/*')
if 'parts' in f and 'sampled' not in f])
os.makedirs('data/processed-sentences-text', exist_ok=True)
iterables = [ParquetsIterable(infiles=files[i::8]) for i in range(8)]
outfiles = ['data/processed-sentences-text/sentences-%d.txt' % i
for i in range(8)]
write_iterables_to_files(iterables, outfiles, n_cores=8)