I'm trying to count the occurrence of a large corpus of words in a large pool of documents using multiple threads in a MapReduce-like fashion. Each thread starts with a dictionary mapping every word in the corpus to zero and increments the counts as it goes over the documents. Once all threads are done I want to accumulate the counts. I load the first json file into memory and loop through the rest, summing the counts:
def merge_counts(self):
path = os.path.join(self.get_current_job_directory(), 'batch_0.json')
with open (path, 'r') as json_file:
kmers = json.load(json_file)
batch = None
for i in range(1, self.num_threads):
path = os.path.join(self.get_current_job_directory(), 'batch_' + str(i) + '.json')
with open (path, 'r') as json_file:
batch = json.load(json_file)
for kmer in batch:
kmers[kmer] += batch[kmer]
batch = None
return kmers
Here kmers
is the eventual dictionary containing all the final counts. The number of keys inside kmers
remains the same no matter how many batches are load as they all have the same keys only with different counts. Each json file is approx 3 gig large so the final result will also be around the same. I'm expecting this code to use as much memory as holding two such json files in memory (the consensus so far and the current batch) as dictionaries but the memory usage grows linearly with each batch until I eventually run out of memory. Why is that happening? Isn't the previous stuff referenced by the variable batch
supposed to be garbage collected when I set it to None
?
Update: So I added gc.collect()
at the end of each loop iteration and the memory still grows linearly with each iteration. Any ideas?