18

I am attempting to query a subset of a MySql database table, feed the results into a Pandas DataFrame, alter some data, and then write the updated rows back to the same table. My table size is ~1MM rows, and the number of rows I will be altering will be relatively small (<50,000) so bringing back the entire table and performing a df.to_sql(tablename,engine, if_exists='replace') isn't a viable option. Is there a straightforward way to UPDATE the rows that have been altered without iterating over every row in the DataFrame?

I am aware of this project, which attempts to simulate an "upsert" workflow, but it seems it only accomplishes the task of inserting new non-duplicate rows rather than updating parts of existing rows:

GitHub Pandas-to_sql-upsert

Here is a skeleton of what I'm attempting to accomplish on a much larger scale:

import pandas as pd
from sqlalchemy import create_engine
import threading

#Get sample data
d = {'A' : [1, 2, 3, 4], 'B' : [4, 3, 2, 1]}
df = pd.DataFrame(d)

engine = create_engine(SQLALCHEMY_DATABASE_URI)

#Create a table with a unique constraint on A.
engine.execute("""DROP TABLE IF EXISTS test_upsert """)
engine.execute("""CREATE TABLE test_upsert (
                  A INTEGER,
                  B INTEGER,
                  PRIMARY KEY (A)) 
                  """)

#Insert data using pandas.to_sql
df.to_sql('test_upsert', engine, if_exists='append', index=False)

#Alter row where 'A' == 2
df_in_db.loc[df_in_db['A'] == 2, 'B'] = 6

Now I would like to write df_in_db back to my 'test_upsert' table with the updated data reflected.

This SO question is very similar, and one of the comments recommends using an "sqlalchemy table class" to perform the task.

Update table using sqlalchemy table class

Can anyone expand on how I would implement this for my specific case above if that is the best (only?) way to implement it?

Community
  • 1
  • 1
D Clancy
  • 183
  • 1
  • 1
  • 4

4 Answers4

16

I think the easiest way would be to:

first delete those rows that are going to be "upserted". This can be done in a loop, but it's not very efficient for bigger data sets (5K+ rows), so i'd save this slice of the DF into a temporary MySQL table:

# assuming we have already changed values in the rows and saved those changed rows in a separate DF: `x`
x = df[mask]  # `mask` should help us to find changed rows...

# make sure `x` DF has a Primary Key column as index
x = x.set_index('a')

# dump a slice with changed rows to temporary MySQL table
x.to_sql('my_tmp', engine, if_exists='replace', index=True)

conn = engine.connect()
trans = conn.begin()

try:
    # delete those rows that we are going to "upsert"
    engine.execute('delete from test_upsert where a in (select a from my_tmp)')
    trans.commit()

    # insert changed rows
    x.to_sql('test_upsert', engine, if_exists='append', index=True)
except:
    trans.rollback()
    raise

PS i didn't test this code so it might have some small bugs, but it should give you an idea...

MaxU - stand with Ukraine
  • 205,989
  • 36
  • 386
  • 419
  • Just implemented this and had to change one thing: `engine` doesn't have a `commit` method. I got rid of that line altogether and it works. – raddevon Jul 13 '17 at 18:28
  • @raddevon, thank you for your comment! This solution has been tested against MySQL DB. When we create engine for MySQL DB SQL Alchemy returns an object, which has `.commit()` method... – MaxU - stand with Ukraine Jul 13 '17 at 19:00
  • You're welcome! Sounds good. My DB is Postgres. SQLAlchemy's Postgres engine class must not have the commit method. – raddevon Jul 13 '17 at 19:02
  • 1
    @raddevon, i've updated my answer - this new version should be working with any DB, supported by SQL Alchemy... – MaxU - stand with Ukraine Jul 13 '17 at 19:12
  • 2
    Better hope there aren't any foreign keys attached to a primary key on the table where you are deleting rows. That could leave a messy situation... – JBB Jan 09 '19 at 22:23
  • 1
    what if there is auto increment key in the table? replace or deleting rows seems like a risky solution. – Nasif Imtiaz Ohi Jun 12 '20 at 02:06
4

A MySQL specific solution using Panda's to_sql "method" arg and sqlalchemy's mysql insert on_duplicate_key_update features:

def create_method(meta):
    def method(table, conn, keys, data_iter):
        sql_table = db.Table(table.name, meta, autoload=True)
        insert_stmt = db.dialects.mysql.insert(sql_table).values([dict(zip(keys, data)) for data in data_iter])
        upsert_stmt = insert_stmt.on_duplicate_key_update({x.name: x for x in insert_stmt.inserted})
        conn.execute(upsert_stmt)

    return method

engine = db.create_engine(...)
conn = engine.connect()
with conn.begin():
    meta = db.MetaData(conn)
    method = create_method(meta)
    df.to_sql(table_name, conn, if_exists='append', method=method)
patrick
  • 194
  • 1
  • 3
  • Important to note that this will result in *delete and append*, instead of the usual update if duplicate. I'm not sure why though. – Ilyas Foo Jul 08 '20 at 06:42
  • 3
    Whats `db.` in this context? alchemy tends to be called alone as create_engine so im wondering if its something other than pymysql which has its own extension? – Chris McKee Sep 02 '21 at 21:32
2

I was struggling with this before and now I've found a way.

Basically create a separate data frame in which you keep data that you only have to update.

df #updating data in dataframe

s_update = "" #String of updations

# Loop through the data frame

for i in range(len(df)):
    s_update += "update your_table_name set column_name = '%s' where column_name = '%s';"%(df[col_name1][i], df[col_name2][i])

Now pass s_update to cursor.execute or engine.execute (wherever you execute SQL query)

This will update your data instantly.

Xiddoc
  • 3,369
  • 3
  • 11
  • 37
2

Here is a general function that will update each row (but all values in the row simultaneously)

def update_table_from_df(df, table, where):
    '''Will take a dataframe and update each specified row in the SQL table
        with the DF values -- DF columns MUST match SQL columns
        WHERE statement should be triple-quoted string
        Will not update any columns contained in the WHERE statement'''
    update_string = f'UPDATE {table} set '
    for idx, row in df.iterrows():
        upstr = update_string
        for col in list(df.columns):
            if (col != 'datetime') & (col not in where):
                if col != df.columns[-1]:
                    if type(row[col] == str):
                        upstr += f'''{col} = '{row[col]}', '''
                    else:
                        upstr += f'''{col} = {row[col]}, '''
                else:
                    if type(row[col] == str):
                        upstr += f'''{col} = '{row[col]}' '''
                    else:
                        upstr += f'''{col} = {row[col]} '''
        upstr += where
        cursor.execute(upstr)
        cursor.commit()```