I need to update more than 1B+ rows on PostgreSQL table which contains some indexes. I am working on PostgreSQL 12 + SQLAlchemy + Python.
Inspired by the answers here, I wrote a temp table and UPDATE... FROM
based updater to see if it makes a difference. The temp table is then fed from CSV generated by Python, and uploaded over the normal SQL client connection.
The speed-up naive approach using SQLAlchemy's bulk_update_mappings is 4x - 5x. Not an order of magnitude, but still considerable and in my case this means 1 day, not 1 week, of a batch job.
Below is the relevant Python code that does CREATE TEMPORARY TABLE
, COPY FROM
and UPDATE FROM
. See the full example in this gist.
def bulk_load_psql_using_temp_table(
dbsession: Session,
data_as_dicts: List[dict],
):
"""Bulk update columns in PostgreSQL faster using temp table.
Works around speed issues on `bulk_update_mapping()` and PostgreSQL.
Your mileage and speed may vary, but it is going to be faster.
The observation was 3x ... 4x faster when doing UPDATEs
where one of the columns is indexed.
Contains hardcoded temp table creation and UPDATE FROM statements.
In our case we are bulk updating three columns.
- Create a temp table - if not created before
- Filling it from the in-memory CSV using COPY FROM
- Then performing UPDATE ... FROM on the actual table from the temp table
- Between the update chunks, clear the temp table using TRUNCATE
Why is it faster? I have did not get a clear answer from the sources I wa reading.
At least there should be
less data uploaded from the client to the server,
as CSV loading is more compact than bulk updates.
Further reading
- `About PSQL temp tables <https://www.postgresqltutorial.com/postgresql-tutorial/postgresql-temporary-table/>`_
- `Naive bulk_update_mapping approach <https://stackoverflow.com/questions/36272316/using-bulk-update-mappings-in-sqlalchemy-to-update-multiple-rows-with-different>`_
- `Discussion on UPDATE ... FROM + temp table approach <https://stackoverflow.com/questions/3361291/slow-simple-update-query-on-postgresql-database-with-3-million-rows/24811058#24811058>_`.
:dbsession:
SQLAlchemy session.
Note that we open a separate connection for the bulk update.
:param data_as_dicts:
In bound data as it would be given to bulk_update_mapping
"""
# mem table created in sql
temp_table_name = "temp_bulk_temp_loader"
# the real table of which data we are filling
real_table_name = "swap"
# colums we need to copy
columns = ["id", "sync_event_id", "sync_reserve0", "sync_reserve1"]
# how our CSV fields are separated
delim = ";"
# Increase temp buffer size for updates
temp_buffer_size = "3000MB"
# Dump data to a local mem buffer using CSV writer.
# No header - this is specifically addressed in copy_from()
out = StringIO()
writer = csv.DictWriter(out, fieldnames=columns, delimiter=delim)
writer.writerows(data_as_dicts)
# Update data in alternative raw connection
engine = dbsession.bind
conn = engine.connect()
try:
# No rollbacks
conn.execution_options(isolation_level="AUTOCOMMIT")
# See https://blog.codacy.com/how-to-update-large-tables-in-postgresql/
conn.execute(f"""SET temp_buffers = "{temp_buffer_size}";""")
# Temp table is dropped at the end of the session
# https://www.postgresqltutorial.com/postgresql-tutorial/postgresql-temporary-table/
# This must match data_as_dicts structure.
sql = f"""
CREATE TEMP TABLE IF NOT EXISTS {temp_table_name}
(
id int,
sync_event_id int,
sync_reserve0 bytea,
sync_reserve1 bytea
);
"""
conn.execute(sql)
# Clean any pending data in the temp table
# between update chunks.
# TODO: Not sure why this does not clear itself at conn.close()
# as I would expect based on the documentation.
sql = f"TRUNCATE {temp_table_name}"
conn.execute(sql)
# Load data from CSV to the temp table
# https://www.psycopg.org/docs/cursor.html
cursor = conn.connection.cursor()
out.seek(0)
cursor.copy_from(out, temp_table_name, sep=delim, columns=columns)
# Fill real table from the temp table
# This copies values from the temp table using
# UPDATE...FROM and matching by the row id.
sql = f"""
UPDATE {real_table_name}
SET
sync_event_id=b.sync_event_id,
sync_reserve0=b.sync_reserve0,
sync_reserve1=b.sync_reserve1
FROM {temp_table_name} AS b
WHERE {real_table_name}.id=b.id;
"""
res = conn.execute(sql)
logger.debug("Updated %d rows", res.rowcount)
finally:
conn.close()