I have a 25Gb plaintext file with ~10 million lines, several hundred words per line. Each line needs to be individually processed and I'm trying to split off chunks to a dozen workers to be processed in parallel. Currently loading in a million lines at a time (this for some reason takes up ~10Gb in RAM even though it's only ~3Gb uncompressed on disk,) splitting it evenly 12 ways, and then mapping it to 12 workers using multiprocessing.Pool.
Problem is when each of my 12 workers finish processing their allocated data, their RAM is not being freed and only increases another ~10Gb on the next million line iteration.
I've tried "del"'ing the previous data, resetting the previous data to an empty allocation, creating iterable variable names with eval(), gc.collect() after deletion, and entirely separating the IO to its own function, all with no luck and the exact same issue. Running debug shows that the python interpreter is only recognizing the expected data and data from previous iteration is not accessible, so why isn't the RAM actually being freed?
The code below is my latest iteration of trying to separate all the environments, not the most efficient but "BigFileOnDisk" is on an SSD so re-reading through the file each time is negligible compared to actually processing the data. Previously had the "read" functionality within the allocation function, deleting all data after workers finished, with same results.
def allocation():
fileCompleted = False
currentLine = 0
while not fileCompleted:
lineData, currentLine, fileCompleted = read(numLines=1000000, startLine=currentLine)
list_of_values(function_object=worker, inputs=lineData, workers=12)
def read(numLines, startLine=0):
currentLine = 0
lines = []
with open(BigFileOnDisk, 'r') as fid:
for line in fid:
if currentLine >= startLine:
lines.append(line)
if currentLine - startLine >= numLines:
return lines, counter, False
currentLine += 1
# or if we've hit the end of the file
return lines, counter, True
def worker(lines):
outputPath = *root* + str(datetime.datetime.now().time())
processedData = {}
for line in lines:
# process data
del lines
with open(outputPath, 'a') as fid:
for item in processedData:
fid.write(str(item) + ', ' + str(processedData[item]) + '\n')
def list_of_values(function_object, inputs, workers = 10):
inputs_split = []
subsection_start = 0
for n in range(workers):
start = int(subsection_start)
end = int(subsection_start + len(inputs) / workers)
subsection_start = end
inputs_split.append( inputs[start:end] )
p = Pool(workers)
p.map(function_object, inputs_split)