42

I have written the program (below) to:

  • read a huge text file as pandas dataframe
  • then groupby using a specific column value to split the data and store as list of dataframes.
  • then pipe the data to multiprocess Pool.map() to process each dataframe in parallel.

Everything is fine, the program works well on my small test dataset. But, when I pipe in my large data (about 14 GB), the memory consumption exponentially increases and then freezes the computer or gets killed (in HPC cluster).

I have added codes to clear the memory as soon as the data/variable isn't useful. I am also closing the pool as soon as it is done. Still with 14 GB input I was only expecting 2*14 GB memory burden, but it seems like lot is going on. I also tried to tweak using chunkSize and maxTaskPerChild, etc but I am not seeing any difference in optimization in both test vs. large file.

I think improvements to this code is/are required at this code position, when I start multiprocessing.

p = Pool(3) # number of pool to run at once; default at 1 result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values())) but, I am posting the whole code.

Test example: I created a test file ("genome_matrix_final-chr1234-1mb.txt") of upto 250 mb and ran the program. When I check the system monitor I can see that memory consumption increased by about 6 GB. I am not so clear why so much memory space is taken by 250 mb file plus some outputs. I have shared that file via drop box if it helps in seeing the real problem. https://www.dropbox.com/sh/coihujii38t5prd/AABDXv8ACGIYczeMtzKBo0eea?dl=0

Can someone suggest, How I can get rid of the problem?

My python script:

#!/home/bin/python3

import pandas as pd
import collections
from multiprocessing import Pool
import io
import time
import resource

print()
print('Checking required modules')
print()


''' change this input file name and/or path as need be '''
genome_matrix_file = "genome_matrix_final-chr1n2-2mb.txt"   # test file 01
genome_matrix_file = "genome_matrix_final-chr1234-1mb.txt"  # test file 02
#genome_matrix_file = "genome_matrix_final.txt"    # large file 

def main():
    with open("genome_matrix_header.txt") as header:
        header = header.read().rstrip('\n').split('\t')
        print()

    time01 = time.time()
    print('starting time: ', time01)

    '''load the genome matrix file onto pandas as dataframe.
    This makes is more easy for multiprocessing'''
    gen_matrix_df = pd.read_csv(genome_matrix_file, sep='\t', names=header)

    # now, group the dataframe by chromosome/contig - so it can be multiprocessed
    gen_matrix_df = gen_matrix_df.groupby('CHROM')

    # store the splitted dataframes as list of key, values(pandas dataframe) pairs
    # this list of dataframe will be used while multiprocessing
    gen_matrix_df_list = collections.OrderedDict()
    for chr_, data in gen_matrix_df:
        gen_matrix_df_list[chr_] = data

    # clear memory
    del gen_matrix_df

    '''Now, pipe each dataframe from the list using map.Pool() '''
    p = Pool(3)  # number of pool to run at once; default at 1
    result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))

    del gen_matrix_df_list  # clear memory

    p.close()
    p.join()


    # concat the results from pool.map() and write it to a file
    result_merged = pd.concat(result)
    del result  # clear memory

    pd.DataFrame.to_csv(result_merged, "matrix_to_haplotype-chr1n2.txt", sep='\t', header=True, index=False)

    print()
    print('completed all process in "%s" sec. ' % (time.time() - time01))
    print('Global maximum memory usage: %.2f (mb)' % current_mem_usage())
    print()


'''function to convert the dataframe from genome matrix to desired output '''
def matrix_to_vcf(matrix_df):

    print()
    time02 = time.time()

    # index position of the samples in genome matrix file
    sample_idx = [{'10a': 33, '10b': 18}, {'13a': 3, '13b': 19},
                    {'14a': 20, '14b': 4}, {'16a': 5, '16b': 21},
                    {'17a': 6, '17b': 22}, {'23a': 7, '23b': 23},
                    {'24a': 8, '24b': 24}, {'25a': 25, '25b': 9},
                    {'26a': 10, '26b': 26}, {'34a': 11, '34b': 27},
                    {'35a': 12, '35b': 28}, {'37a': 13, '37b': 29},
                    {'38a': 14, '38b': 30}, {'3a': 31, '3b': 15},
                    {'8a': 32, '8b': 17}]

    # sample index stored as ordered dictionary
    sample_idx_ord_list = []
    for ids in sample_idx:
        ids = collections.OrderedDict(sorted(ids.items()))
        sample_idx_ord_list.append(ids)


    # for haplotype file
    header = ['contig', 'pos', 'ref', 'alt']

    # adding some suffixes "PI" to available sample names
    for item in sample_idx_ord_list:
        ks_update = ''
        for ks in item.keys():
            ks_update += ks
        header.append(ks_update+'_PI')
        header.append(ks_update+'_PG_al')


    #final variable store the haplotype data
    # write the header lines first
    haplotype_output = '\t'.join(header) + '\n'


    # to store the value of parsed the line and update the "PI", "PG" value for each sample
    updated_line = ''

    # read the piped in data back to text like file
    matrix_df = pd.DataFrame.to_csv(matrix_df, sep='\t', index=False)

    matrix_df = matrix_df.rstrip('\n').split('\n')
    for line in matrix_df:
        if line.startswith('CHROM'):
            continue

        line_split = line.split('\t')
        chr_ = line_split[0]
        ref = line_split[2]
        alt = list(set(line_split[3:]))

        # remove the alleles "N" missing and "ref" from the alt-alleles
        alt_up = list(filter(lambda x: x!='N' and x!=ref, alt))

        # if no alt alleles are found, just continue
        # - i.e : don't write that line in output file
        if len(alt_up) == 0:
            continue

        #print('\nMining data for chromosome/contig "%s" ' %(chr_ ))
        #so, we have data for CHR, POS, REF, ALT so far
        # now, we mine phased genotype for each sample pair (as "PG_al", and also add "PI" tag)
        sample_data_for_vcf = []
        for ids in sample_idx_ord_list:
            sample_data = []
            for key, val in ids.items():
                sample_value = line_split[val]
                sample_data.append(sample_value)

            # now, update the phased state for each sample
            # also replacing the missing allele i.e "N" and "-" with ref-allele
            sample_data = ('|'.join(sample_data)).replace('N', ref).replace('-', ref)
            sample_data_for_vcf.append(str(chr_))
            sample_data_for_vcf.append(sample_data)

        # add data for all the samples in that line, append it with former columns (chrom, pos ..) ..
        # and .. write it to final haplotype file
        sample_data_for_vcf = '\t'.join(sample_data_for_vcf)
        updated_line = '\t'.join(line_split[0:3]) + '\t' + ','.join(alt_up) + \
            '\t' + sample_data_for_vcf + '\n'
        haplotype_output += updated_line

    del matrix_df  # clear memory
    print('completed haplotype preparation for chromosome/contig "%s" '
          'in "%s" sec. ' %(chr_, time.time()-time02))
    print('\tWorker maximum memory usage: %.2f (mb)' %(current_mem_usage()))

    # return the data back to the pool
    return pd.read_csv(io.StringIO(haplotype_output), sep='\t')


''' to monitor memory '''
def current_mem_usage():
    return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024.


if __name__ == '__main__':
    main()

Update for bounty hunters:

I have achieved multiprocessing using Pool.map() but the code is causing a big memory burden (input test file ~ 300 mb, but memory burden is about 6 GB). I was only expecting 3*300 mb memory burden at max.

  • Can somebody explain, What is causing such a huge memory requirement for such a small file and for such small length computation.
  • Also, i am trying to take the answer and use that to improve multiprocess in my large program. So, addition of any method, module that doesn't change the structure of computation part (CPU bound process) too much should be fine.
  • I have included two test files for the test purposes to play with the code.
  • The attached code is full code so it should work as intended as it is when copied-pasted. Any changes should be used only to improve optimization in multiprocessing steps.
Abdulrahman Bres
  • 2,603
  • 1
  • 20
  • 39
everestial007
  • 6,665
  • 7
  • 32
  • 72
  • My suggestion is to work on pyspark if you have heavy file to process. – Dinusha Thilakarathna Mar 22 '18 at 16:37
  • @DinushaDilanka : I just briefly skimmed through pyspark. It looks good, but is it a replacement for pandas. IAlso, another problem is that I will have to learn a new package and rewrite my whole program. This above program is a just a mock run of my program and data to rid of the memory issue on multiprocessing. Any examples on your suggestion would be good. Thanks, – everestial007 Mar 22 '18 at 20:22
  • Python does not have good performance in multiprocessing because it was a script language. Therefore it is better get help from other library that support to python API like pyspark. – Dinusha Thilakarathna Mar 23 '18 at 03:16
  • If you are using pyspark you can read all files at one time, also possible to do groupby without using any multiprocessing. – Dinusha Thilakarathna Mar 23 '18 at 03:24
  • Give me an example. Thanks! – everestial007 Mar 23 '18 at 03:47
  • 1
    Please refer this [link](https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/) – Dinusha Thilakarathna Mar 23 '18 at 03:52
  • @DinushaDilanka I had looked at several documentation on `pyspark` when you mentioned it today morning. But, to be honest you are being less helpful. In that link look at `7. Pandas vs PySpark DataFrame`, which suggests that there are several incompatibilities - thus this will cause a bit of a problem. I am trying to finish my project and this is less than helpful. If you can take the above code and shared file in dropbox how you propose on doing it - so I can apply the idea to my bigger program. Else, I doubt switching to pyspark in a day is possible. Thanks though ! – everestial007 Mar 23 '18 at 04:13
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/167382/discussion-between-dinusha-dilanka-and-everestial007). – Dinusha Thilakarathna Mar 23 '18 at 04:17
  • 1
    Can you reduce this to a simpler example, without any irrelevant code, that has the same problem, and where a solution to your example would let you build a solution for your real code? That would make this a lot easier to solve. See [mcve] in the help for pointers. (This is definitely an answerable question as-is, it could just be a more easily answerable question.) – abarnert Mar 25 '18 at 19:30
  • @abarnert: Thanks for looking. I always get the feedback that my questions are not complete enough. So, I posted this question which is minimal in what I am trying to do. I put another question though, which was to ask people on what I am having the problem. But, this one is less technical, I guess : https://stackoverflow.com/questions/49475489/why-does-memory-consumption-increase-dramatically-in-pool-map-multiprocessin – everestial007 Mar 25 '18 at 19:33
  • 1
    Figuring out how to make a question complete and minimal at the same time is usually not easy—strip out too many irrelevancies and people will just ask "Why would you want to do this?" But if you give us code that we can run and play with without needing to understand your file format and how you're processing it in Pandas and so on, it may be easier to find (and test) a solution. – abarnert Mar 25 '18 at 19:38
  • @abarnert: Sorry, if I am not following you. You seem to be quite succint on your expression (being a expert programmer). The above code and data are workable (with just copy and paste), the only problem I am facing is performance gain. I would like to improve my question and expression, but I would need specific detail on what needs to be added. – everestial007 Mar 25 '18 at 19:44
  • You didn't include the "genome_matrix_header.txt" file in the dropbox, so it won't run as-is. Could you please include it? Thanks. – Brian Mar 29 '18 at 00:27
  • @Brian: I just added that file to the shared dropbox link. In the script I also added method to compute and display the current memory usage by the process. Hope it helps, and let me know if you have any questions. – everestial007 Mar 29 '18 at 02:42
  • Hey @everestial007 did my answer not work? – Jeff Ellen Mar 30 '18 at 17:34

4 Answers4

94

Prerequisite

  1. In Python (in the following I use 64-bit build of Python 3.6.5) everything is an object. This has its overhead and with getsizeof we can see exactly the size of an object in bytes:

    >>> import sys
    >>> sys.getsizeof(42)
    28
    >>> sys.getsizeof('T')
    50
    
  2. When fork system call used (default on *nix, see multiprocessing.get_start_method()) to create a child process, parent's physical memory is not copied and copy-on-write technique is used.
  3. Fork child process will still report full RSS (resident set size) of the parent process. Because of this fact, PSS (proportional set size) is more appropriate metric to estimate memory usage of forking application. Here's an example from the page:
  • Process A has 50 KiB of unshared memory
  • Process B has 300 KiB of unshared memory
  • Both process A and process B have 100 KiB of the same shared memory region

Since the PSS is defined as the sum of the unshared memory of a process and the proportion of memory shared with other processes, the PSS for these two processes are as follows:

  • PSS of process A = 50 KiB + (100 KiB / 2) = 100 KiB
  • PSS of process B = 300 KiB + (100 KiB / 2) = 350 KiB

The data frame

Not let's look at your DataFrame alone. memory_profiler will help us.

justpd.py

#!/usr/bin/env python3

import pandas as pd
from memory_profiler import profile

@profile
def main():
    with open('genome_matrix_header.txt') as header:
        header = header.read().rstrip('\n').split('\t')

    gen_matrix_df = pd.read_csv(
        'genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)

    gen_matrix_df.info()
    gen_matrix_df.info(memory_usage='deep')

if __name__ == '__main__':
    main()

Now let's use the profiler:

mprof run justpd.py
mprof plot

We can see the plot:

memory_profile

and line-by-line trace:

Line #    Mem usage    Increment   Line Contents
================================================
     6     54.3 MiB     54.3 MiB   @profile
     7                             def main():
     8     54.3 MiB      0.0 MiB       with open('genome_matrix_header.txt') as header:
     9     54.3 MiB      0.0 MiB           header = header.read().rstrip('\n').split('\t')
    10                             
    11   2072.0 MiB   2017.7 MiB       gen_matrix_df = pd.read_csv('genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)
    12                                 
    13   2072.0 MiB      0.0 MiB       gen_matrix_df.info()
    14   2072.0 MiB      0.0 MiB       gen_matrix_df.info(memory_usage='deep')

We can see that the data frame takes ~2 GiB with peak at ~3 GiB while it's being built. What's more interesting is the output of info.

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4000000 entries, 0 to 3999999
Data columns (total 34 columns):
...
dtypes: int64(2), object(32)
memory usage: 1.0+ GB

But info(memory_usage='deep') ("deep" means introspection of the data deeply by interrogating object dtypes, see below) gives:

memory usage: 7.9 GB

Huh?! Looking outside of the process we can make sure that memory_profiler's figures are correct. sys.getsizeof also shows the same value for the frame (most probably because of custom __sizeof__) and so will other tools that use it to estimate allocated gc.get_objects(), e.g. pympler.

# added after read_csv
from pympler import tracker
tr = tracker.SummaryTracker()
tr.print_diff()   

Gives:

                                             types |   # objects |   total size
================================================== | =========== | ============
                 <class 'pandas.core.series.Series |          34 |      7.93 GB
                                      <class 'list |        7839 |    732.38 KB
                                       <class 'str |        7741 |    550.10 KB
                                       <class 'int |        1810 |     49.66 KB
                                      <class 'dict |          38 |      7.43 KB
  <class 'pandas.core.internals.SingleBlockManager |          34 |      3.98 KB
                             <class 'numpy.ndarray |          34 |      3.19 KB

So where do these 7.93 GiB come from? Let's try to explain this. We have 4M rows and 34 columns, which gives us 134M values. They are either int64 or object (which is a 64-bit pointer; see using pandas with large data for detailed explanation). Thus we have 134 * 10 ** 6 * 8 / 2 ** 20 ~1022 MiB only for values in the data frame. What about the remaining ~ 6.93 GiB?

String interning

To understand the behaviour it's necessary to know that Python does string interning. There are two good articles (one, two) about string interning in Python 2. Besides the Unicode change in Python 3 and PEP 393 in Python 3.3 the C-structures have changed, but the idea is the same. Basically, every short string that looks like an identifier will be cached by Python in an internal dictionary and references will point to the same Python objects. In other word we can say it behaves like a singleton. Articles that I mentioned above explain what significant memory profile and performance improvements it gives. We can check if a string is interned using interned field of PyASCIIObject:

import ctypes

class PyASCIIObject(ctypes.Structure):
     _fields_ = [
         ('ob_refcnt', ctypes.c_size_t),
         ('ob_type', ctypes.py_object),
         ('length', ctypes.c_ssize_t),
         ('hash', ctypes.c_int64),
         ('state', ctypes.c_int32),
         ('wstr', ctypes.c_wchar_p)
    ]

Then:

>>> a = 'name'
>>> b = '!@#$'
>>> a_struct = PyASCIIObject.from_address(id(a))
>>> a_struct.state & 0b11
1
>>> b_struct = PyASCIIObject.from_address(id(b))
>>> b_struct.state & 0b11
0

With two strings we can also do identity comparison (addressed in memory comparison in case of CPython).

>>> a = 'foo'
>>> b = 'foo'
>>> a is b
True
>> gen_matrix_df.REF[0] is gen_matrix_df.REF[6]
True

Because of that fact, in regard to object dtype, the data frame allocates at most 20 strings (one per amino acids). Though, it's worth noting that Pandas recommends categorical types for enumerations.

Pandas memory

Thus we can explain the naive estimate of 7.93 GiB like:

>>> rows = 4 * 10 ** 6
>>> int_cols = 2
>>> str_cols = 32
>>> int_size = 8
>>> str_size = 58  
>>> ptr_size = 8
>>> (int_cols * int_size + str_cols * (str_size + ptr_size)) * rows / 2 ** 30
7.927417755126953

Note that str_size is 58 bytes, not 50 as we've seen above for 1-character literal. It's because PEP 393 defines compact and non-compact strings. You can check it with sys.getsizeof(gen_matrix_df.REF[0]).

Actual memory consumption should be ~1 GiB as it's reported by gen_matrix_df.info(), it's twice as much. We can assume it has something to do with memory (pre)allocation done by Pandas or NumPy. The following experiment shows that it's not without reason (multiple runs show the save picture):

Line #    Mem usage    Increment   Line Contents
================================================
     8     53.1 MiB     53.1 MiB   @profile
     9                             def main():
    10     53.1 MiB      0.0 MiB       with open("genome_matrix_header.txt") as header:
    11     53.1 MiB      0.0 MiB           header = header.read().rstrip('\n').split('\t')
    12                             
    13   2070.9 MiB   2017.8 MiB       gen_matrix_df = pd.read_csv('genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)
    14   2071.2 MiB      0.4 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[gen_matrix_df.keys()[0]])
    15   2071.2 MiB      0.0 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[gen_matrix_df.keys()[0]])
    16   2040.7 MiB    -30.5 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    ...
    23   1827.1 MiB    -30.5 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    24   1094.7 MiB   -732.4 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    25   1765.9 MiB    671.3 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    26   1094.7 MiB   -671.3 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    27   1704.8 MiB    610.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    28   1094.7 MiB   -610.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    29   1643.9 MiB    549.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    30   1094.7 MiB   -549.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    31   1582.8 MiB    488.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    32   1094.7 MiB   -488.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])    
    33   1521.9 MiB    427.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])    
    34   1094.7 MiB   -427.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    35   1460.8 MiB    366.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    36   1094.7 MiB   -366.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    37   1094.7 MiB      0.0 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    ...
    47   1094.7 MiB      0.0 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])

I want to finish this section by a quote from fresh article about design issues and future Pandas2 by original author of Pandas.

pandas rule of thumb: have 5 to 10 times as much RAM as the size of your dataset

Process tree

Let's come to the pool, finally, and see if can make use of copy-on-write. We'll use smemstat (available form an Ubuntu repository) to estimate process group memory sharing and glances to write down system-wide free memory. Both can write JSON.

We'll run original script with Pool(2). We'll need 3 terminal windows.

  1. smemstat -l -m -p "python3.6 script.py" -o smemstat.json 1
  2. glances -t 1 --export-json glances.json
  3. mprof run -M script.py

Then mprof plot produces:

3 processes

The sum chart (mprof run --nopython --include-children ./script.py) looks like:

enter image description here

Note that two charts above show RSS. The hypothesis is that because of copy-on-write it's doesn't reflect actual memory usage. Now we have two JSON files from smemstat and glances. I'll the following script to covert the JSON files to CSV.

#!/usr/bin/env python3

import csv
import sys
import json

def smemstat():
  with open('smemstat.json') as f:
    smem = json.load(f)

  rows = []
  fieldnames = set()    
  for s in smem['smemstat']['periodic-samples']:
    row = {}
    for ps in s['smem-per-process']:
      if 'script.py' in ps['command']:
        for k in ('uss', 'pss', 'rss'):
          row['{}-{}'.format(ps['pid'], k)] = ps[k] // 2 ** 20

    # smemstat produces empty samples, backfill from previous
    if rows:            
      for k, v in rows[-1].items():
        row.setdefault(k, v)

    rows.append(row)
    fieldnames.update(row.keys())

  with open('smemstat.csv', 'w') as out:
    dw = csv.DictWriter(out, fieldnames=sorted(fieldnames))
    dw.writeheader()
    list(map(dw.writerow, rows))

def glances():
  rows = []
  fieldnames = ['available', 'used', 'cached', 'mem_careful', 'percent',
    'free', 'mem_critical', 'inactive', 'shared', 'history_size',
    'mem_warning', 'total', 'active', 'buffers']
  with open('glances.csv', 'w') as out:
    dw = csv.DictWriter(out, fieldnames=fieldnames)
    dw.writeheader()
    with open('glances.json') as f:
      for l in f:
        d = json.loads(l)
        dw.writerow(d['mem'])

if __name__ == '__main__':
  globals()[sys.argv[1]]()

First let's look at free memory.

enter image description here

The difference between first and minimum is ~4.15 GiB. And here is how PSS figures look like:

enter image description here

And the sum:

enter image description here

Thus we can see that because of copy-on-write actual memory consumption is ~4.15 GiB. But we're still serialising data to send it to worker processes via Pool.map. Can we leverage copy-on-write here as well?

Shared data

To use copy-on-write we need to have the list(gen_matrix_df_list.values()) be accessible globally so the worker after fork can still read it.

  1. Let's modify code after del gen_matrix_df in main like the following:

    ...
    global global_gen_matrix_df_values
    global_gen_matrix_df_values = list(gen_matrix_df_list.values())
    del gen_matrix_df_list
    
    p = Pool(2)
    result = p.map(matrix_to_vcf, range(len(global_gen_matrix_df_values)))
    ...
    
  2. Remove del gen_matrix_df_list that goes later.
  3. And modify first lines of matrix_to_vcf like:

    def matrix_to_vcf(i):
        matrix_df = global_gen_matrix_df_values[i]
    

Now let's re-run it. Free memory:

free

Process tree:

process tree

And its sum:

sum

Thus we're at maximum of ~2.9 GiB of actual memory usage (the peak main process has while building the data frame) and copy-on-write has helped!

As a side note, there's so called copy-on-read, the behaviour of Python's reference cycle garbage collector, described in Instagram Engineering (which led to gc.freeze in issue31558). But gc.disable() doesn't have an impact in this particular case.

Update

An alternative to copy-on-write copy-less data sharing can be delegating it to the kernel from the beginning by using numpy.memmap. Here's an example implementation from High Performance Data Processing in Python talk. The tricky part is then to make Pandas to use the mmaped Numpy array.

saaj
  • 23,253
  • 3
  • 104
  • 105
  • 14
    Such a comprehensive, detailed and beautiful answer. I wish I could put 50 points on you. But, it was already given. But, this is the accepted answer. I am going to reflect back several time to this Q/A in my programming career. Most helpful are the method you put there for finding the devil that was causing memory issue. There is a saying, “Devil is in the details.” – everestial007 May 05 '18 at 14:52
  • "But gc.disable() doesn't have an impact in this particular case." - Why that wouldn't help against the copy-on-read behavior? – fjsj Oct 06 '20 at 14:53
  • Though almost 3 years old later...I am facing a similar issue...just my pandas processing is being done inside the thread and am still facing out of memory issues...can u help.. – Fr_nkenstien Apr 06 '21 at 20:28
14

When you use multiprocessing.Pool a number of child processes will be created using the fork() system call. Each of those processes start off with an exact copy of the memory of the parent process at that time. Because you're loading the csv before you create the Pool of size 3, each of those 3 processes in the pool will unnecessarily have a copy of the data frame. (gen_matrix_df as well as gen_matrix_df_list will exist in the current process as well as in each of the 3 child processes, so 4 copies of each of these structures will be in memory)

Try creating the Pool before loading the file (at the very beginning actually) That should reduce the memory usage.

If it's still too high, you can:

  1. Dump gen_matrix_df_list to a file, 1 item per line, e.g:

    import os
    import cPickle
    
    with open('tempfile.txt', 'w') as f:
        for item in gen_matrix_df_list.items():
            cPickle.dump(item, f)
            f.write(os.linesep)
    
  2. Use Pool.imap() on an iterator over the lines that you dumped in this file, e.g.:

    with open('tempfile.txt', 'r') as f:
        p.imap(matrix_to_vcf, (cPickle.loads(line) for line in f))
    

    (Note that matrix_to_vcf takes a (key, value) tuple in the example above, not just a value)

I hope that helps.

NB: I haven't tested the code above. It's only meant to demonstrate the idea.

tomas
  • 963
  • 6
  • 19
  • thanks for the answer. I will try this answer in about a day and let you know. I am hoping this is going to work. – everestial007 Mar 27 '18 at 02:08
  • You might not need to suffer the disk IO if you can fit your data in memory twice. I had exactly this problem with a large DataFrame (stored in self.big_df), but I was able to get away with an easier solution: just chunk the DataFrame. I had a quick loop build a list of parameters with chunks of the df, (so now memory is 2x self.big_df - one for original and one for the chunks) and then I explicitly assigned self.big_df={}. I subsequently created the pool and no longer had memory issues, each thread only had memory demands equal to a small percentage of the original df. – Jeff Ellen Mar 27 '18 at 07:18
  • Ok, I didn't see that's what @everestial007 was already doing, and too long had elapsed to edit my comment. I think it's just that the GC isn't happening. This answer is better if your data can only fit in memory once, but you're potentially waiting a long time for the disk if you write it back out and then read it back in again if you don't have to. – Jeff Ellen Mar 27 '18 at 07:35
  • The suggestion to dump data into disk and stream from there is only in case creating the pool at the top of the function doesn't reduce memory consumption enough. I think starting the pool before loading anything will have the greatest impact though, because right now everything is stored in memory in 4 different processes. – tomas Mar 27 '18 at 12:06
  • @tomas I haven't tried it, but I don't see how instantiating the pool any earlier reduces the memory. I don't think the pool has only one master copy of the data structures, I think each one has it's own, based on statements like this: "This can be problematic for large arguments as they will be reallocated n_jobs times by the workers" (from https://pythonhosted.org/joblib/parallel.html) – Jeff Ellen Mar 28 '18 at 18:00
  • Each worker process has its own copy of the argument, that is true (in this case `list(gen_matrix_df_list.values())`. In addition to that, each worker has its own copy of every variable that has been created before the worker was spawned. (in this case, the biggest one is `gen_matrix_df_list`). Regarding `gen_matrix_df`, I'm not sure what would happen with that, because its reference is deleted, but it may still be allocated. Calling `gc.collect()` after `del gen_matrix_df` is also a good idea. – tomas Mar 29 '18 at 08:39
  • 1
    @tomas The only thing that improved my memory usage was to move the `p=Pool(3)` at the beginning of the main function. Thank you. All, other things really didn't improve anything. Even reassignment of the variable rather than deletion made no difference. I think I am going to take this approach: https://stackoverflow.com/questions/34143397/python-multiprocessing-on-a-generator-that-reads-files-in by splitting my file by `chr_`. I received not complete answer, but still I would like to offer the bounty. @jeff ellen too suggested moving the `Pool()` ahead. – everestial007 Apr 01 '18 at 21:03
  • I don't understand why you link to a question that was about reading in 1000's of small files, when you have just 1 large file. So you're saying your solution is to pre-process your one large file into a bunch of smaller ones (with a separate thread), then later run code as in the one you linked, to read those small files back in? That's more complicated because it's more lines of code to debug and now you need to keep two copies of your data, and adds a lot of extra disk overhead so you won't get your overall answer as fast. Also, I didn't suggest move the Pool() ahead, not sure that helps. – Jeff Ellen Apr 02 '18 at 06:37
10

I had the same issue. I needed to process a huge text corpus while keeping a knowledge base of few DataFrames of millions of rows loaded in memory. I think this issue is common so I will keep my answer oriented for general purposes.

A combination of settings solved the problem for me (1 & 3 & 5 only might do it for you):

  1. Use Pool.imap (or imap_unordered) instead of Pool.map. This will iterate over data lazily than loading all of it in memory before starting processing.

  2. Set a value to chunksize parameter. This will make imap faster too.

  3. Set a value to maxtasksperchild parameter.

  4. Append output to disk than in memory. Instantly or every while when it reaches a certain size.

  5. Run the code in different batches. You can use itertools.islice if you have an iterator. The idea is to split your list(gen_matrix_df_list.values()) to three or more lists, then you pass the first third only to map or imap, then the second third in another run, etc. Since you have a list you can simply slice it in the same line of code.

Abdulrahman Bres
  • 2,603
  • 1
  • 20
  • 39
  • Thanks for the answer. Can you should me the code style of yours (using your own data, or my data) so I can transfer the idea on this question and my large program. – everestial007 Mar 24 '18 at 23:21
  • I think there is no gain for me using #5, since the data will be in queue (as input, and as output) regardless. Only 4 seems to make a reasonable gain in memory optimization, but would it not cause i/o bottleneck, and unordered output. Also, I just tried `imap` and I don't see any gain (both speed and memory consuption). – everestial007 Mar 24 '18 at 23:27
  • there are modules `settings` and `read_data`. Are those your local module ? – everestial007 Mar 25 '18 at 00:03
  • Yes few are, settings has files paths, and read-data has iterator to read from a huge json file item by item. While annotator module takes an item and returns processed text. I don't mind showing all the project, but it is not done yet and not all parts are needed or work. – Abdulrahman Bres Mar 25 '18 at 00:09
  • hello @AbdulrahmanBres could share the code style with above parameters you mentioned for me to see how can i use it in my program. The above link does not above file anymore so. – rozi Aug 14 '23 at 16:19
  • @rozi https://github.com/abdo-br/wikipedia-joint-embeddings/blob/7993a1cf64b5831f800d299cce16aa9626550a38/extract_stats.py#L78C5-L78C5 – Abdulrahman Bres Aug 15 '23 at 17:14
  • @AbdulrahmanBres Thank you so much for proving the link. I have a quick question can similar iteration and chunk size can be used for file writing. And how it would differ? – rozi Aug 15 '23 at 18:36
  • @rozi A similar iteration concept can be applied to write data to a file, but the target file will likely get data in a wrong order because with imap, the pool process which starts and finishes first would write its data chunk first. Also, if two processes try to write to a file at the same time this will throw an error because the file is locked by one of them. You can solve this of course in different ways but keep your requirements in mind. Check here https://superfastpython.com/multiprocessing-pool-imap/#:~:text=The%20imap()%20function%20should,all%20issued%20tasks%20have%20completed. – Abdulrahman Bres Aug 15 '23 at 19:38
  • Thank you very much for the answer and the link. – rozi Aug 21 '23 at 17:55
4

GENERAL ANSWER ABOUT MEMORY WITH MULTIPROCESSING

You asked: "What is causing so much memory to be allocated". The answer relies on two parts.

First, as you already noticed, each multiprocessing worker gets it's own copy of the data (quoted from here), so you should chunk large arguments. Or for large files, read them in a little bit at a time, if possible.

By default the workers of the pool are real Python processes forked using the multiprocessing module of the Python standard library when n_jobs != 1. The arguments passed as input to the Parallel call are serialized and reallocated in the memory of each worker process.

This can be problematic for large arguments as they will be reallocated n_jobs times by the workers.

Second, if you're trying to reclaim memory, you need to understand that python works differently than other languages, and you are relying on del to release the memory when it doesn't. I don't know if it's best, but in my own code, I've overcome this be reassigning the variable to a None or empty object.

FOR YOUR SPECIFIC EXAMPLE - MINIMAL CODE EDITING

As long as you can fit your large data in memory twice, I think you can do what you are trying to do by just changing a single line. I've written very similar code and it worked for me when I reassigned the variable (vice call del or any kind of garbage collect). If this doesn't work, you may need to follow the suggestions above and use disk I/O:

    #### earlier code all the same
    # clear memory by reassignment (not del or gc)
    gen_matrix_df = {}

    '''Now, pipe each dataframe from the list using map.Pool() '''
    p = Pool(3)  # number of pool to run at once; default at 1
    result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))

    #del gen_matrix_df_list  # I suspect you don't even need this, memory will free when the pool is closed

    p.close()
    p.join()
    #### later code all the same

FOR YOUR SPECIFIC EXAMPLE - OPTIMAL MEMORY USAGE

As long as you can fit your large data in memory once, and you have some idea of how big your file is, you can use Pandas read_csv partial file reading, to read in only nrows at a time if you really want to micro-manage how much data is being read in, or a [fixed amount of memory at a time using chunksize], which returns an iterator5. By that I mean, the nrows parameter is just a single read: you might use that to just get a peek at a file, or if for some reason you wanted each part to have exactly the same number of rows (because, for example, if any of your data is strings of variable length, each row will not take up the same amount of memory). But I think for the purposes of prepping a file for multiprocessing, it will be far easier to use chunks, because that directly relates to memory, which is your concern. It will be easier to use trial & error to fit into memory based on specific sized chunks than number of rows, which will change the amount of memory usage depending on how much data is in the rows. The only other difficult part is that for some application specific reason, you're grouping some rows, so it just makes it a little bit more complicated. Using your code as an example:

   '''load the genome matrix file onto pandas as dataframe.
    This makes is more easy for multiprocessing'''

    # store the splitted dataframes as list of key, values(pandas dataframe) pairs
    # this list of dataframe will be used while multiprocessing
    #not sure why you need the ordered dict here, might add memory overhead
    #gen_matrix_df_list = collections.OrderedDict()  
    #a defaultdict won't throw an exception when we try to append to it the first time. if you don't want a default dict for some reason, you have to initialize each entry you care about.
    gen_matrix_df_list = collections.defaultdict(list)   
    chunksize = 10 ** 6

    for chunk in pd.read_csv(genome_matrix_file, sep='\t', names=header, chunksize=chunksize)
        # now, group the dataframe by chromosome/contig - so it can be multiprocessed
        gen_matrix_df = chunk.groupby('CHROM')
        for chr_, data in gen_matrix_df:
            gen_matrix_df_list[chr_].append(data)

    '''Having sorted chunks on read to a list of df, now create single data frames for each chr_'''
    #The dict contains a list of small df objects, so now concatenate them
    #by reassigning to the same dict, the memory footprint is not increasing 
    for chr_ in gen_matrix_df_list.keys():
        gen_matrix_df_list[chr_]=pd.concat(gen_matrix_df_list[chr_])

    '''Now, pipe each dataframe from the list using map.Pool() '''
    p = Pool(3)  # number of pool to run at once; default at 1
    result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))
    p.close()
    p.join()
Jeff Ellen
  • 540
  • 2
  • 8
  • Yours and answer by Tomas look promising. And, I hadn’t had time to test it. I will do it tomorrow. I like the idea of reassignment. For now about `As long as you can fit .... in memory twice` - why not 3 times, 4 times? I was also thinking if there is a way to create the list as interator, generator or yield and pass it to `Pool.map()` process. Any suggestions? – everestial007 Mar 31 '18 at 01:46
  • @everestial007 Because you only need to fit it in twice: the full original copy, and each chunk as you make the chunks, so twice. 3 or 4 times is just excessive. When you make a generator you only save on memory if you don't first have the whole item in memory (or if you are doing something new, like the generator being the result of a zip of two existing lists). And actually, I didn't know it before, but after looking, pandas has a partial file read method that would work better in your case, I bet. I'll edit my answer. – Jeff Ellen Mar 31 '18 at 14:49
  • The only thing that improved my memory usage was to move the `p=Pool(3)` at the beginning of the main function. The assignment of chunksize won't be helpful to me because I have to read the whole data from one chromosome at once - a little complicated reason. I was also thinking if reading data as iterator, generator would help. Rather, this method https://stackoverflow.com/questions/34143397/python-multiprocessing-on-a-generator-that-reads-files-in was able to work better than anything. But, there will be some drag due to I/O rewriting. – everestial007 Apr 01 '18 at 20:55
  • Also, the reassignment really didn't reduce the memory use. I am not sure for what reason. – everestial007 Apr 01 '18 at 20:57
  • @everestial007 Your response to me makes no sense, did you try my code? You say my solution won't work because "you have to read the whole data from one chromosome at once". But your original code doesn't do that. It reads in the whole CSV end to end, nothing special. Then your code uses the 'group by' to prepare some chromosome group for each member in the pool. My code does almost exactly the same: It reads in a chunk of the file, then uses 'group by' to prepare a chromosome group. The only question is whether or not I picked a good chunksize for your system, you might have to adjust it. – Jeff Ellen Apr 02 '18 at 06:27