0

so I have got a csv file named "real_acct" that is in a folder and a POSTGRES table (also called real_acct) on my postgres server. The csv file has some records updated and some new records added. I want to compare the csv file and postgres table from server. If the csv file contains a new row, it should be added to the postgres table too. If a certain record has been updated like for eg column "no of bedrooms" for account=923 is updated then that updation should be done on the postgres table too.

I am doing this using python. The table on postgres server has 1,100,000+ rows and there are some number of rows almost in the csv file on server.

def update_records(connection, csv_source, table_name, encoding ):
 with open(glob.glob(csv_source+f'/*{table_name.lower()}'+'.csv')[0],'r',encoding='latin-1') as f:
    reader = csv.DictReader(f)
    read_csv = [row for row in reader]

 try:
  cur=connection.cursor()
  cur.execute(f"SELECT * FROM {table_name.lower()}")
  table_columns = [col[0] for col in cur.description]

  for csv_row in read_csv:
      cur.execute("SELECT * FROM real_acct WHERE account = %s", [csv_row['account']])
      db_row = cur.fetchone()
      if not db_row:
          insert_query = f"INSERT INTO real_acct ({', '.join(table_columns)}) VALUES ({', '.join(['%s']*len(table_columns))})"
          values = [csv_row.get(col, None) for col in table_columns]
          cur.execute(insert_query, values)
      else:
          update_query = "UPDATE real_acct SET "
          values = []
          for col in table_columns:
              if csv_row.get(col) is not '':
                  if csv_row[col] != str(db_row[table_columns.index(col)]):
                      update_query += f"{col} = %s, "
                      values.append(csv_row[col])
              else:
                  update_query += f"{col} = NULL, "
          update_query = update_query[:-2] + " WHERE account = %s;"
          values.append(csv_row['account'])
          cur.execute(update_query, values)
  connection.commit();
  cur.close();
 except psycopg2.DatabaseError as error:
  raise ValueError(error)

The problem I run into is SIGKILL. The memory usage gets too high and the dag I'm running stops with the error of NEG.SIGKILL. Pandas seems an even worse option to read sql tables.

  • 2
    Import the CSV file into a staging table. Then run an `INSERT ... ON CONFLICT (...) DO UPDATE` from the staging table to the real table. –  Apr 12 '23 at 06:27
  • If I understand this correctly, it is you loading the csv file in memory (probably very large in size), that is causing the OS to send SIGKILL to the process. Pandas enables us to read the csv file in chunks. Look at this [answer](https://stackoverflow.com/questions/25962114/how-do-i-read-a-large-csv-file-with-pandas) – satinder singh Apr 12 '23 at 10:05

0 Answers0