0

When I run the code below on a large corpus of many gigabytes of parquet files, the memory usage steadily increases, e.g. from 3% per core at 10 minutes to 7% at 30 minutes to 10%+ at 6 hours. This despite my explicit call to gc.collect() and the fact that the files are all approximately the same size, and that only one file should be loaded at a time. I don't see any persistent references to old files, so I am stumped.

This problem forces me to limit the number of cores I use to tokenize my data, doubling my processing time.

from nltk.tokenize import RegexpTokenizer
import pandas as pd
import re
import string
import os
from multiprocessing import Pool
import gc
import glob


def adjust_tokens(tokens):
    new_tokens = []
    number_regex = '[0-9]+(?:[.][0-9]+)?'
    for token in tokens:
        if token in string.punctuation:
            continue
        if token == '...':
            continue
        if re.match(number_regex, token):
            new_tokens.append('aquantity')
        else:
            new_tokens.append(token.lower())
    return new_tokens


class ParquetsIterable():
    def __init__(self, tokenizer=None, indir=None, infiles=None,
                 filters=None, keys=None):
        if tokenizer is not None:
            self.tokenizer = tokenizer
        else:
            self.tokenizer = RegexpTokenizer(
                r'(?:\w+)(?:[.-]\w+)*[-+]?|(?:[.][.][.])'
                + r'|(?:[!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~])'
                                            )
        if infiles is not None:
            self.infiles = infiles
        else:
            self.infiles = [os.path.join(indir, f) for f in os.listdir(indir)]
        self.filters = filters
        self.keys = keys

    def __iter__(self):
        for filename in self.infiles:
            try:
                df = pd.read_parquet(filename)
            except Exception:
                print('Invalid parquet file %s\n' % filename)
                continue
            if self.keys is not None:
                try:
                    assert (self.keys[filename].values
                            == df['key'].values).all()
                except AssertionError:
                    # This never happens
                    print('Skipping %s because of reorderinging'
                          ' and my unwillingness to write in a join'
                          % filename)
                    continue
            if self.filters is not None:
                df = df[self.filters[filename].values]
            gc.collect()
            for text in df['sentences']:
                for sentence in text.split('\n'):
                    yield adjust_tokens(self.tokenizer.tokenize(sentence))


def write_iterables_to_files(iterables, files, n_cores=8):
    pairs = zip(iterables, files)
    with Pool(n_cores) as p:
        p.map(write_iterable_to_file, pairs)


def write_iterable_to_file(iterable_and_file):
    iterable, file_ = iterable_and_file
    with open(file_, 'w') as f:
        for tokens in iterable:
            f.write(' '.join(tokens) + '\n')


if '__name__' == '__main__':
    files = sorted([f for f in glob.glob('data/sentence-parquets/*')
                    if 'parts' in f and 'sampled' not in f])
    os.makedirs('data/processed-sentences-text', exist_ok=True)
    iterables = [ParquetsIterable(infiles=files[i::8]) for i in range(8)]
    outfiles = ['data/processed-sentences-text/sentences-%d.txt' % i
                for i in range(8)]
    write_iterables_to_files(iterables, outfiles, n_cores=8)
Zorgoth
  • 499
  • 3
  • 9
  • The tokenization being wrapped up in an iterable is related to my intention to use the output of this with Gensim's Word2Vec model, although I no longer need this format (in fact, the purpose of write_iterable_to_files is to create something that can be iterated over more simply by a different class, since Gensim can't work in parallel when given the original ParquetsIterable class).. – Zorgoth Sep 23 '22 at 17:55
  • I don't see anything immediately obvious that should lead to a memory leak. see https://stackoverflow.com/questions/1435415/python-memory-leaks. But generally I'd say if you're waiting around ~8 hours for this to run you should definitely focus on optimizing your pandas operations using e.g. [vectorized string methods](https://pandas.pydata.org/docs/user_guide/text.html) rather than looping over all elements and manipulating each cell individually. you might see speedups of several orders of magnitude and significan memory use reductions making this issue moot. – Michael Delgado Sep 23 '22 at 18:12
  • 1
    It could be this https://issues.apache.org/jira/browse/ARROW-14987 – 0x26res Sep 23 '22 at 18:40

0 Answers0