1

I have a massive table (over 100B records), that I added an empty column to. I parse strings from another field (string) if the required string is available, extract an integer from that field, and want to update it in the new column for all rows that have that string.

At the moment, after data has been parsed and saved locally in a dataframe, I iterate on it to update the Redshift table with clean data. This takes approx 1sec/iteration, which is way too long.

My current code example:

conn = psycopg2.connect(connection_details)
cur = conn.cursor()
clean_df = raw_data.apply(clean_field_to_parse)
for ind, row in clean_df.iterrows():
  update_query = build_update_query(row.id, row.clean_integer1, row.clean_integer2)
  cur.execute(update_query)

where update_query is a function to generate the update query:

def update_query(id, int1, int2):
  query = """
  update tab_tab
  set 
  clean_int_1 = {}::int,
  clean_int_2 = {}::int,
  updated_date = GETDATE()
  where id = {}
  ;
  """
  return query.format(int1, int2, id)

and where clean_df is structured like:

id . field_to_parse . clean_int_1 . clean_int_2
1  . {'int_1':'2+1'}.      3      .    np.nan
2  . {'int_2':'7-0'}.     np.nan  .      7

Is there a way to update specific table fields in bulk, so that there is no need to execute one query at a time?

I'm parsing the strings and running the update statement from Python. The database is stored on Redshift.

guyts
  • 899
  • 2
  • 17
  • 34
  • 3
    100B is 100 _billion_? You really _don't_ want to be parsing that df line-by-line, let alone running single queries. What does `parse_single_row()` do? Any solution must first address that before then doing inserts in bulk – roganjosh Nov 11 '19 at 19:58
  • Consider pushing data frame as a temp table to run a [UPDATE...FROM](https://docs.aws.amazon.com/redshift/latest/dg/c_Examples_of_UPDATE_statements.html) between two tables. – Parfait Nov 11 '19 at 20:00
  • @roganjosh parse_single_row takes in a json from `field_to_parse` and extracts valuable info from there. This is not the time-consuming task here - locally it runs in a reasonable amount of time. And yes - 100B is billion. This will be running for a few days, which is fine, but at the current rate of update (1 second per row, this will take longer than a few days). – guyts Nov 11 '19 at 20:02
  • @Parfait will look into that – guyts Nov 11 '19 at 20:03
  • 2
    The whole point of pandas is not to iterate dataframes though. Batch processing alone will probably be several orders of magnitude faster, that's before you then increase the update efficiency of the table. You should include the function anyway, it's not possible for me to understand from just a description. Also "this will take longer than a few days" is the understatement of the year :P – roganjosh Nov 11 '19 at 20:05
  • @roganjosh the dataframe can be cleaned in bulk. The only reason at the moment this is not happening is because I defaulted to updating the table row-by-row. If I can update the table in bulk - I will not be iterating through the df – guyts Nov 11 '19 at 20:06
  • 100b rows at 1row a second will take around 3k years. for this sort of big data you should look into Hadoop or Pyspark. – Umar.H Nov 11 '19 at 20:13
  • @roganjosh I've updated the question for clarity. – guyts Nov 11 '19 at 20:19
  • 1
    Thanks. Also, apologies, I misread your last comment – roganjosh Nov 11 '19 at 20:20
  • [This](http://www.silota.com/blog/amazon-redshift-upsert-support-staging-table-replace-rows/) is the type of approach I was going to go for since I knew it was based on Postgres but it doesn't seem to be well supported. I have a [recent answer](https://stackoverflow.com/a/58664330/4799172) that shows an approach to keep a file in memory only and copy into Postgres. I don't think the exact `COPY` approach will be the same, but putting them together they may give you several pieces of your puzzle – roganjosh Nov 11 '19 at 20:35
  • 1
    So, batch the DF, process the column in the batch as a whole, convert the batch to an in-memory file, create a staging table, copy the file to the staging table and then join across to the main table. Quite a bit of faffing but hopefully significantly faster – roganjosh Nov 11 '19 at 20:37
  • 1
    https://docs.aws.amazon.com/redshift/latest/dg/json-functions.html <- You could just write a Redshift query and run it without having to do it with Python. This will be the most productive way to do it, IMHO – GSazheniuk Nov 11 '19 at 20:38
  • 1
    Agree entirely with @GSazheniuk. Find a way to do it directly in SQL. Even if you have to write your data out into a table that can be used in the update, nothing is going to beat letting the database do its thing. Databases, and sql, are designed to work on sets of data. – TomC Nov 11 '19 at 23:28
  • @Parfait if you want to post your solution in a more detailed way - I've used the Update..From, it works fine – guyts Nov 12 '19 at 14:09

1 Answers1

6

As mentioned, consider pure SQL and avoid iterating through billions of rows by pushing the Pandas data frame to Postgres as a staging table and then run one single UPDATE across both tables. With SQLAlchemy you can use DataFrame.to_sql to create a table replica of data frame. Even add an index of the join field, id, and drop the very large staging table at end.

from sqlalchemy import create_engine

engine = create_engine("postgresql+psycopg2://myuser:mypwd!@myhost/mydatabase")

# PUSH TO POSTGRES (SAME NAME AS DF)
clean_df.to_sql(name="clean_df", con=engine, if_exists="replace", index=False)

# SQL UPDATE (USING TRANSACTION)
with engine.begin() as conn:     

    sql = "CREATE INDEX idx_clean_df_id ON clean_df(id)"
    conn.execute(sql)

    sql = """UPDATE tab_tab t
             SET t.clean_int_1 = c.int1,
                 t.clean_int_2 = c.int2,
                 t.updated_date = GETDATE()
             FROM clean_df c
             WHERE c.id = t.id
          """
    conn.execute(sql)

    sql = "DROP TABLE IF EXISTS clean_df"
    conn.execute(sql)

engine.dispose()
Parfait
  • 104,375
  • 17
  • 94
  • 125