1

My data file is gnomad.exomes.r2.1.1.sites.13.vcf.bgz.

I have decompressed the VCF. And here is my code to insert in Postgres after filtering data:

# =============================================================================
# import libraries for python 
# =============================================================================
import vcf
import os.path
from sqlalchemy import create_engine
import psycopg2       
import datetime
import sys
import _thread
import csv

from io import StringIO
import pandas as pd

db_user_name = "postgres";db_password = "password";db_name = "vcd";db_host_name = "localhost"
url = 'postgres://'+db_user_name+':'+db_password+'@'+db_host_name+'/'+db_name
engine = create_engine(url)

# =============================================================================
# function drops the previous table if exists and creates new table with dynamic table name
# =============================================================================
def create_table(table_name,table_type,conn_obj):
    dbcur = conn_obj.cursor()
    dbcur.execute("""DROP TABLE IF EXISTS {0};""".format(table_name))
    if table_type == '1' :
        dbcur.execute("""CREATE TABLE IF NOT EXISTS {0} (as_ID INT, as_NM TEXT, as_at_ID INT, as_at_NM TEXT, VCF_ID TEXT, VARIANT_ID TEXT, as_at_LINE_SEQ INT, DATE_START DATE, DATE_END DATE, as_at_VAL_SEQ INT, as_at_VA_NM TEXT, as_at_VALUE TEXT);""".format(table_name))
    elif table_type == '2' :
        dbcur.execute("""CREATE TABLE IF NOT EXISTS {0} (as_at_VA_NM TEXT, as_at_VA_DESC TEXT,as_at_DATA_TYPE TEXT);""".format(table_name))     
    conn_obj.commit()
    message = 'Table '+table_name+' created successfully.\n'
    print(message)
    return table_name

# =============================================================================
# function for insert vcf files data in values table
# =============================================================================

def create_tuples(as_id, vcf_id, Variant_id, as_at_nm, sample_id, as_at_va_nm, as_at_value, date_start, date_end, as_va_seq, as_at_va_line_seq,data_table_name):
    as_at_id = '1'
    sample_id = '1'
    variant_id = '1'
    as_nm = 'VCF'
    global datalist
    if (as_at_va_nm in headers_arr)==False:
        return
    #datalist.append("({0},'{1}','{2}','{3}','{4}',{5},'{6}','{7}','{8}','{9}')".format(as_id,str(as_nm),as_at_id,as_at_nm,"",variant_id,as_at_va_line_seq,as_va_seq,as_at_va_nm,as_at_value))
    datalist.append([as_id,str(as_nm),as_at_id,as_at_nm,vcf_id,variant_id,as_at_va_line_seq,as_va_seq,as_at_va_nm,as_at_value])

    if len(datalist)==10000:
        #insertdata(datalist,data_table_name,as_id)
        _thread.start_new_thread( insertdata, (datalist,data_table_name,as_id ) )
        datalist=[]
def insertdata(datalist2,data_table_name,as_id):
    global execution_start_time
    columns = ["as_id","as_nm","as_at_id","as_at_nm","vcf_id","variant_id","as_at_line_seq","as_at_val_seq","as_at_va_nm","as_at_value"]
    df = pd.DataFrame(datalist2,columns=columns)
    df.to_sql(data_table_name,engine,if_exists='append',method=psql_insert_copy, index=False)
    time_now = datetime.datetime.now()
    time_difference = time_now - execution_start_time
    print('Total inserted records:'+str(datalist2[len(datalist2)-1][6])+' in time '+str(time_difference))

def psql_insert_copy(table, conn, keys, data_iter):
    # gets a DBAPI connection that can provide a cursor
    dbapi_conn = conn.connection
    with dbapi_conn.cursor() as cur:
        s_buf = StringIO()

        writer = csv.writer(s_buf)
        writer.writerows(data_iter)
        s_buf.seek(0)

        columns = ', '.join('"{}"'.format(k) for k in keys)
        if table.schema:
            table_name = '{}.{}'.format(table.schema, table.name)
        else:
          table_name = table.name

        sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
                          table_name, columns)
        cur.copy_expert(sql=sql, file=s_buf)
# =============================================================================
# function defination / switch case for get recoras variable name & variable value
# =============================================================================
def get_DataFrom_Column(header):
    switcher = {
        "CHROM" : {'variable_name':'CHROM', 'variable_value': record.CHROM},
        "POS" : {'variable_name':'POS', 'variable_value': record.POS},
        "ID" : {'variable_name':'ID', 'variable_value': record.ID},
        "REF" : {'variable_name':'REF', 'variable_value': record.REF},
        "ALT" : {'variable_name':'ALT', 'variable_value': record.ALT},
        "QUAL" : {'variable_name':'QUAL', 'variable_value': record.QUAL},
        "FILTER" : {'variable_name':'FILTER', 'variable_value': record.FILTER},
        "INFO" : {'variable_name':'INFO', 'variable_value': record.INFO},
        "FORMAT" : {'variable_name':'FORMAT', 'variable_value': record.FORMAT}
    } 
    return switcher.get(header, "Invalid header")   
dic={}
def insert_infos_metadata(reader_infos,line_index,file_name,as_at_nm):
    for info_data in reader_infos:
        va_desc = reader_infos[info_data].desc
        arr_len = len(va_desc.split(':'))
        if arr_len > 1 :
            last_str = va_desc.split(':')[arr_len-1]
            dic[info_data]=last_str
    return line_index

# =============================================================================
# global variable declaration
# =============================================================================
index = 0
variableList = []
datalist  = []
headers_arr = []
totalrecordinserted=0
curentRecord=-1
execution_start_time= datetime.datetime.now()                                       # calculate execution start time

# =============================================================================
# get file path or name from user
# =============================================================================

file_path = "/gnomad.exomes.r2.1.1.sites.13.vcf"
if os.path.isfile(file_path) :                                                      #Check file is exists or not on given path
    if True:
        conn = psycopg2.connect(host="localhost",database="vcd", user="postgres", password="pgAdmin")
        cursor = conn.cursor()
        data_table_name = "data2"
        create_table(data_table_name,"1",conn)
        vcf_reader = vcf.Reader(open(file_path, 'r'))
        column_headers = vcf_reader._column_headers
        line_index = 0
        null_date = None
        as_at_header = 'Header'
        as_at_variant = 'Variant' 
        reader_infos = vcf_reader.infos
        line_index = insert_infos_metadata(reader_infos,line_index,file_path,as_at_header)

        execution_start_time= datetime.datetime.now()  
        headers_arr = ['INFO/AF', 'INFO/AF_NFE_SEU', 'INFO/AF_RAW', 'INFO/AF_FIN_FEMALE', 'INFO/AF_NFE_BGR', 'INFO/AF_SAS_MALE', 'INFO/AF_AFR_MALE', 'INFO/AF_AFR', 'INFO/AF_EAS_FEMALE', 'INFO/AF_AFR_FEMALE', 'INFO/AF_SAS', 'INFO/AF_NFE_ONF', 'INFO/AF_FIN_MALE', 'INFO/AF_NFE_FEMALE', 'INFO/AF_AMR', 'INFO/AF_EAS', 'INFO/AF_ASJ_MALE', 'INFO/AF_OTH_FEMALE', 'INFO/AF_NFE_SWE', 'INFO/AF_NFE_NWE', 'INFO/AF_EAS_JPN', 'INFO/AF_FEMALE', 'INFO/AF_EAS_KOR', 'INFO/AF_EAS_OEA', 'INFO/AF_NFE_EST', 'INFO/AF_EAS_MALE', 'INFO/AF_NFE', 'INFO/AF_FIN', 'INFO/AF_NFE_MALE', 'INFO/AF_SAS_FEMALE', 'INFO/AF_ASJ_FEMALE', 'INFO/AF_ASJ', 'INFO/AF_OTH', 'INFO/AF_MALE', 'INFO/AF_AMR_MALE', 'INFO/AF_AMR_FEMALE', 'INFO/AF_OTH_MALE', 'INFO/AF_POPMAX', 'CHROM', 'POS', 'REF', 'ALT', 'QUAL', 'FILTER']
        for record in vcf_reader:
            index += 1
            line_index += 1
            if index == 50000:
                break
            Variant_id = ''
            sample_name = ''
            #for header in column_headers : 
            for header in column_headers :
                if header != 'FORMAT' :
                    header_data_dict = get_DataFrom_Column(header)
                    #print(header_data_dict)
                    header_data_dict_value = header_data_dict['variable_value']
                    #print(header_data_dict_value)
                    if isinstance(header_data_dict_value, str) :
                        create_tuples("1", file_path, Variant_id, as_at_variant, sample_name, header, str(header_data_dict_value), null_date, null_date, 1, str(line_index),data_table_name)
                    elif isinstance(header_data_dict_value, list) :
                        if len(header_data_dict_value) == 0 and header == 'FILTER' :
                            create_tuples("1", file_path, Variant_id, as_at_variant, sample_name, header, 'PASS', null_date, null_date, 1, str(line_index),data_table_name)
                        else :
                            i = 0
                            while i < len(header_data_dict_value) :
                                create_tuples("1", file_path, Variant_id, as_at_variant, sample_name, header, str(header_data_dict_value[i]), null_date, null_date, str(i), str(line_index),data_table_name)
                                i += 1
                    elif isinstance(header_data_dict_value, dict) :
                        for dict_val_record in header_data_dict_value :
                            #print(dict_val_record,'.......',header_data_dict_value[dict_val_record])
                            variable_name = header + '/' + dict_val_record
                            variable_value = header_data_dict_value[dict_val_record]
                            #print('.........',variable_value)
                            variable_seq = 1 
                            if isinstance(variable_value,list) :
                                for value in variable_value :
                                    if dict_val_record in dic :  
                                        header_key_description = vcf_reader.infos[dict_val_record].desc
                                        arr_len = len(header_key_description.split(':'))
                                        last_str = header_key_description.split(':')[arr_len-1]
                                        array_obj_desc_headers = last_str.split('|')
                                        arr_obj_val = value.strip("()[]").split("|")
                                        vararray = dic[dict_val_record].split("|")
                                        arr_index = 0
                                        for desc_header_value in arr_obj_val :
                                            variable_name = header+'/'+dict_val_record+'/'+vararray[arr_index].lstrip().replace("'","")
                                            variable_value = arr_obj_val[arr_index]
                                            arr_index += 1
                                            create_tuples("1", file_path, Variant_id, as_at_variant, sample_name, variable_name, variable_value, null_date, null_date, str(variable_seq), str(line_index),data_table_name)
                                    else :
                                          create_tuples("1", file_path, Variant_id, as_at_variant, sample_name, variable_name, value, null_date, null_date, str(variable_seq), str(line_index),data_table_name)

                                    variable_seq += 1  
                            else :
                                create_tuples("1", file_path, Variant_id, as_at_variant, sample_name, variable_name, variable_value, null_date, null_date, str(variable_seq), str(line_index),data_table_name)
                    else :
                        variable_name = header
                        variable_value = str(header_data_dict_value)
                        create_tuples("1", file_path, Variant_id, as_at_variant, sample_name, variable_name, variable_value, null_date, null_date, 1, str(line_index),data_table_name)
                else :
                    format_data_arr = (record.FORMAT).split(":")
                    for format_data_record in format_data_arr :
                        variable_name = header+'/'+format_data_record
                        variable_value = record.genotype(record_data.sample)[format_data_record]
                        create_tuples("1", file_path, Variant_id, as_at_variant, sample_name, variable_name, variable_value, null_date, null_date, 1,str(line_index),data_table_name)

        insertdata(datalist,data_table_name,"1")
        print('Variants inserted successfully.')
    else:
        print('Incorrect Choice.')
else:
    print('File not present.')

Here is the output tables:

as_id   as_nm   as_at_id   as_at_nm vcf_id  variant_id  as_at_line_seq  date_start  date_end    as_at_val_seq   as_at_va_nm                         as_at_value
1       VCF     1           Variant ""      1           1                                       1               CHROM                               22
1       VCF     1           Variant ""      1           1                                       1               POS                                 16050036
1       VCF     1           Variant ""      1           1                                       1               ID                                  rs374742143
1       VCF     1           Variant ""      1           1                                       1               REF                                 A
1       VCF     1           Variant ""      1           1                                       0               ALT                                 C
1       VCF     1           Variant ""      1           1                                       1               QUAL                                442156.34
1       VCF     1           Variant ""      1           1                                       0               FILTER                              RF
1       VCF     1           Variant ""      1           1                                       1               INFO/non_neuro_nhomalt_amr_male     0
1       VCF     1           Variant ""      1           1                                       1               INFO/controls_AN_afr                4
1       VCF     1           Variant ""      1           1                                       1               INFO/controls_AC_amr_female         3
1       VCF     1           Variant ""      1           1                                       1               INFO/AF_oth_female                  0.5
1       VCF     1           Variant ""      1           1                                       1               INFO/non_topmed_nhomalt_afr_female  0

It is taking around 5 minutes 43 seconds to process 50,000 records. I am not able to understand why it is taking this long.

Can someone help me optimize this code?

Timur Shtatland
  • 12,024
  • 2
  • 30
  • 47
dang
  • 2,342
  • 5
  • 44
  • 91
  • Hi, did you do some profiling (e.g. if the bulk time is spent on the inserts?) Also would you consider providing smaller (not 1Gb) example data? – Marek Schwarz Feb 07 '20 at 15:46
  • I tried finding the time it takes to process. However, I don't have the stats with me. If the set is small, my code works all fine, however when the file is big, it starts creating issue. – dang Feb 07 '20 at 15:49
  • I also thing that you might find this interesting https://stackoverflow.com/questions/8134602/psycopg2-insert-multiple-rows-with-one-query. If the bottleneck is truly the database. – Marek Schwarz Feb 07 '20 at 15:50
  • I am using DataFrame to insert the data. – dang Feb 07 '20 at 15:52
  • Yes, I've noticed and I think it is less then ideal regarding the performance. (here is link on how to do profiling in python https://stackoverflow.com/a/582337/9327628) – Marek Schwarz Feb 07 '20 at 15:53
  • If you only do one insert at a time the performance you're seeing is reasonable/expected. If you can output your data to a CSV and then COPY INTO the database it'll go very fast without getting too bogged down with specifics. First things first, make `create_tuples` and `insertdata` empty functions/no ops and see how much of the time is spent in parsing. If that fixes your problem you can modify your functions to output CSV files and then quickly import those into PostgreSQL. Both of @MarekSchwarz's links should prove helpful. – Jeff Feb 09 '20 at 18:13
  • PyVCF has a similar API to the CSV module. I would recommend converting the VCF to CSV and then importing the CSV directly into PostgreSQL. It will be fast and require less code. – Jeff Feb 09 '20 at 18:20

2 Answers2

0

To speed up inserts, within a transaction, you can adjust the work_mem and/or maintenance_work_mem. Also, this this answer explains a little more in depth

0

Check this out, there is a good insight on inserting data even upto 100K\sec.

Link

  • Create UNLOGGED table. This reduces the amount of data written to persistent storage by up to 2x.
  • Set WITH (autovacuum_enabled=false) on the table. This saves CPU time and IO bandwidth on useless vacuuming of the table (since we never DELETE or UPDATE the table).

  • Insert rows with COPY FROM STDIN. This is the fastest possible approach to insert rows into table.

  • Minimize the number of indexes in the table, since they slow down inserts. Usually an index on time timestamp with time zone is enough. Add synchronous_commit = off to postgresql.conf.

  • Use table inheritance for fast removal of old data:
Raj Verma
  • 1,050
  • 1
  • 7
  • 19