(NB I do not consider myself a dev, dba or anything like that, I'm learning by doing - lots of (old) programming experience in general but python and sql are very new to me)
I have seen this question but wonder whether 2+ years later things are now different.
I have an MSSQL database of ~20M rows, ~10 columns (but I will be adding more columns).
The source data was imported from CSV with empty values where there was no change between rows, so I have just used pandas dataframe to read in data, fill forward and write it back. I use dataframe.to_sql to write to a temporary table (replacing if it exists each time), then use SQL to update the main table. Updating 100k rows from the temp table takes <1s but dataframe.to_sql is taking about 3 minutes for 100k rows.
It will take about 8hrs to process the whole dB this way, and whilst that's OK for now (I'm learning how to wrangle dataframes, sql etc.) I need a speed up of at least two orders of magnitude.
The known speed issues are:
- T-SQL limits table inserts to 1000 rows
- dataframe.to_sql has to have a small chunk size (currently 100) if I am to avoid pyodbc errors (including negative number of parameters!)
One workaround would be to use dataframe.to_csv with append mode and then re-import the data that has had all the nas filled forward, but that should be a last resort. (It's certainly doable: the source CSV file is 800MB and by the time I have filled in the gaps it will probably be about ~20% bigger)
Now, I have to do a lot of "signal processing" on the data as time series, for which Pandas seemed ideal; on the other hand I do also have Mathematica and can do some work there, but I would like to have a simple technology stack if possible.
So, given that I will have to be making several passes through the whole db, updating it with results the question is:
Is there any hope of getting the desired >100x speedup by one ore more techniques combined with Python + Pandas+ sqlAlchemy with an MSSQL backend... Other Python tools?
(One idea: go the intermediate CSV route and use sqlAlchemy to call a stored procedure to import the new CSV, except I have no idea how to do that.
NB The PC is i8700k + 32GB with GPU - I should also make best use of my hardware? Could I parallelise?)
All suggestions gratefully received!
Hacky code to show that/how I am doing this follows (put together from a couple of files... apologies for any errors/omissions)
import sqlalchemy as sqla
from sqlalchemy import func, select
import datetime as dt
import pyodbc
import pandas as pd
import numpy as np
def namedDbSqlAEngineCreate(dbName):
# Create an engine and switch to the named db
# returns the engine if successful and None if not
engineStr = 'mssql+pyodbc://@' + defaultDSN
engine = sqla.create_engine(engineStr, echo=False)
try:
engine.execute('USE ' +dbName)
return(engine)
except sqla.exc.SQLAlchemyError as ex:
if ex.orig.args[0] == '08004':
print('namedDbSqlAEngineCreate:Database %s does not exist' % dbName)
else:
print(ex.args[0])
return(None)
def Main():
amDebugging = False
debugRows = 100
dbName = 'USDCHF_Raw'
srcCurrencyTableName = "USDCHF Transformed"
engine = ctu.namedDbSqlAEngineCreate(dbName) # won't work unless the db Exists!
if engine != None:
session = sqla.orm.sessionmaker() # do I need a session in this code??
session.configure(bind = engine)
meta = sqla.MetaData(engine)
meta.reflect(bind=engine) # get info about the database from the database, then we can create a table object directly
srcCurrencyTable = meta.tables[srcCurrencyTableName]
connection = engine.connect()
rowCount = connection.scalar(select([func.count('*')]).select_from(srcCurrencyTable)) # now I'm referring to the table object directly
else:
return(None)
chunkStart = 100000
chunkRows = max(100000,2) # start small, but must be at least 2 :)
chunkEnd = min(chunkStart+chunkRows -1, rowCount)
tempTableName = 'tempNasFilledTable'
# and the SQL to copy from a temp table; only City and Region columns need to be updated...
updateSql = ' UPDATE [' + srcCurrencyTableName + ']' + \
' SET ' + \
' [City] = [' + tempTableName + '].[City],' + \
' [Region] = [' + tempTableName + '].[Region]' + \
' FROM [' + tempTableName + ']' +\
' WHERE [' + srcCurrencyTableName + '].[Rownumber] = [' + tempTableName + '].[Rownumber]'
engine.execute(' USE ' + dbName) # without this and the con=engine.engine into_sql the table either does hget created or appears in the Master db!
print('Fill NA to database starting at ' + str(dt.datetime.now()))
while True:
# Prepare a selection; use a sqlAlchemy selectable rather than write SQL as a string...
s = select([srcCurrencyTable]) # selecting the table
s = s.where(srcCurrencyTable.c.Rownumber>=chunkStart) # chaining .where().where(), or use sqla.and_()
s = s.where(srcCurrencyTable.c.Rownumber<=chunkEnd) # adaptation to the table length is now in the chunkEnd update
print('Dataframe load starting at ' + str(dt.datetime.now()))
nasToFillDF = pd.read_sql_query(s,connection)
print('Dataframe load finished at ' + str(dt.datetime.now()))
if amDebugging:
print(nasToFillDF.head(debugRows))
if nasToFillDF.empty == True: # hopefully I manage to read exactly to the end of the database, so this *shouldn't* happen...
break
else:
print('FillNa starting at ' + str(dt.datetime.now()))
nasToFillDF.fillna(inplace=True,method='ffill') # must do it "inplace" - without it there's filling only if assigned to a new object
print('FillNa finished at ' + str(dt.datetime.now()))
print('tempTable write starting at ' + str(dt.datetime.now()))
if amDebugging:
print(nasToFillDF.head(debugRows))
try:
# updating from a dataframe directly doesn't seem to be possible/easy -
# This link suggests using a temp table https://stackoverflow.com/questions/45630095/how-to-update-a-db-table-from-pandas-dataset-with-sqlalchemy
# for some strange reason I need both engine.execute to use the right db and engine.engine in to_sql
# the to_sql works with small tables but fails when chunkSize ~10000 - see this issue https://github.com/mkleehammer/pyodbc/issues/250
# There's a limit on the maximum number of rows per insert to MSSQL of 1000 and to_sql seems to have a chunksize limit of 999/(cols +1)
# workaround is to re export to CSV (!) with a file opened for append see https://stackoverflow.com/questions/17134942/pandas-dataframe-output-end-of-csv
# and see also https://stackoverflow.com/questions/1466000/python-open-built-in-function-difference-between-modes-a-a-w-w-and-r for python modes
# https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.to_csv.html
# to use in the to_csv mode parameter
nasToFillDF.to_sql(name=tempTableName, con=engine.engine, if_exists='replace', chunksize=100, index=False) # create/replace the temp table
print('tempTable write finished at ' + str(dt.datetime.now()))
print('Main table update starting at ' + str(dt.datetime.now()))
try:
connection.execute(updateSql)
print('Main table update finished at ' + str(dt.datetime.now()))
# ok, now advance by chunkRows -1
chunkStart = min(chunkStart + chunkRows-1, rowCount)
chunkEnd = min(chunkStart+chunkRows-1, rowCount)
if chunkStart ==rowCount:
break
if chunkStart==chunkEnd:
chunkStart = chunkEnd-1
except sqla.exc.SQLAlchemyError as serr:
print(serr.args[0])
break
except sqla.exc.SQLAlchemyError as serr:
print(serr.args[0])
break
print('Processed to row ' + str(chunkEnd) + ' at ' + str(dt.datetime.now()))
print('Done at ' + str(dt.datetime.now()))