4

I have a 25Gb plaintext file with ~10 million lines, several hundred words per line. Each line needs to be individually processed and I'm trying to split off chunks to a dozen workers to be processed in parallel. Currently loading in a million lines at a time (this for some reason takes up ~10Gb in RAM even though it's only ~3Gb uncompressed on disk,) splitting it evenly 12 ways, and then mapping it to 12 workers using multiprocessing.Pool.

Problem is when each of my 12 workers finish processing their allocated data, their RAM is not being freed and only increases another ~10Gb on the next million line iteration.

I've tried "del"'ing the previous data, resetting the previous data to an empty allocation, creating iterable variable names with eval(), gc.collect() after deletion, and entirely separating the IO to its own function, all with no luck and the exact same issue. Running debug shows that the python interpreter is only recognizing the expected data and data from previous iteration is not accessible, so why isn't the RAM actually being freed?

The code below is my latest iteration of trying to separate all the environments, not the most efficient but "BigFileOnDisk" is on an SSD so re-reading through the file each time is negligible compared to actually processing the data. Previously had the "read" functionality within the allocation function, deleting all data after workers finished, with same results.

def allocation():
    fileCompleted = False
    currentLine = 0
    while not fileCompleted:
        lineData, currentLine, fileCompleted = read(numLines=1000000, startLine=currentLine)
        list_of_values(function_object=worker, inputs=lineData, workers=12)


def read(numLines, startLine=0):
    currentLine = 0
    lines = []
    with open(BigFileOnDisk, 'r') as fid:
        for line in fid:
            if currentLine >= startLine:
                lines.append(line)
            if currentLine - startLine >= numLines:
                return lines, counter, False
            currentLine += 1
        # or if we've hit the end of the file
        return lines, counter, True


def worker(lines):
    outputPath = *root* + str(datetime.datetime.now().time())
    processedData = {}

    for line in lines:
        # process data

    del lines
    with open(outputPath, 'a') as fid:
        for item in processedData:
            fid.write(str(item) + ', ' + str(processedData[item]) + '\n')


def list_of_values(function_object, inputs, workers = 10):
    inputs_split = []
    subsection_start = 0
    for n in range(workers):
        start = int(subsection_start)
        end = int(subsection_start + len(inputs) / workers)
        subsection_start = end

        inputs_split.append( inputs[start:end] )

    p = Pool(workers)
    p.map(function_object, inputs_split)
MKennedy
  • 68
  • 6
  • We need to see your code!! – kirbyfan64sos Jul 19 '16 at 23:30
  • @kirbyfan64sos Preferably a [mcve] of it. – jpmc26 Jul 19 '16 at 23:43
  • Code has been posted – MKennedy Jul 19 '16 at 23:54
  • Try adding `del linedata` to the end of the loop in `allocation`, and `del inputs` to the end of `list_of_values`. – Broseph Jul 20 '16 at 00:12
  • See [this answer](http://stackoverflow.com/a/11196615/868044). Also see [this article](http://chase-seibert.github.io/blog/2013/08/03/diagnosing-memory-leaks-python.html). – Dan Jul 20 '16 at 00:13
  • @Broseph as far as I can tell deleting the data via "del" does nothing, but shouldn't garbage collection take care of it once we leave the local function? – MKennedy Jul 20 '16 at 00:43
  • @Dan - answer has some examples but not anything specific to RAM management (and basically already what I have,) and working through the memory management article but not sure how helpful it will be - either garbage collecting isn't collecting or somehow the extra processes aren't ending – MKennedy Jul 20 '16 at 00:43
  • Could not find memory leak, yet in read() function, you may no need to read every line from start every time, but tell() to get the position when finished, and use seek() to position it next time. – Ranger Wu Jul 20 '16 at 00:49
  • @Broseph see the accepted answer below, RAM wasn't being freed because the workers weren't closing properly and holding on to their data – MKennedy Jul 20 '16 at 03:49

1 Answers1

4

You are not joining sub processes. After list_of_values done processes created by Pool still alive (kinda, more like zombie, but with alive parent process). They still hold all their values. You can't see their data in main because it in another processes (for same reason gc.collect not working).

To free memory allocated by workers you need to manually join Pool or use with.

def list_of_values(function_object, inputs, workers = 10):
    inputs_split = []
    subsection_start = 0
    for n in range(workers):
        start = int(subsection_start)
        end = int(subsection_start + len(inputs) / workers)
        subsection_start = end

        inputs_split.append( inputs[start:end] )

    with Pool(workers) as p:
        p.map(function_object, inputs_split)
Arnial
  • 1,433
  • 1
  • 11
  • 10